From dde17518ff7f3dd3fe1d53614f211357944516f0 Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Thu, 12 Dec 2024 08:20:18 -0700 Subject: [PATCH 1/5] refactor helper to use in cli in CLD (#15647) * refactor helper to use in cli in CLD * cleanup cfg and validation * fix tests * parallel ccip tests * refactor mcms utils * ccip wait timeout tests * fix oversights --- .../ccip/changeset/accept_ownership_test.go | 8 +- .../ccip/changeset/cs_add_chain_test.go | 14 +- deployment/ccip/changeset/cs_add_lane_test.go | 1 + .../ccip/changeset/cs_ccip_home_test.go | 22 +- .../ccip/changeset/cs_deploy_chain_test.go | 11 +- .../ccip/changeset/cs_home_chain_test.go | 1 + .../changeset/cs_initial_add_chain_test.go | 6 +- deployment/ccip/changeset/cs_jobspec_test.go | 1 + .../ccip/changeset/cs_update_rmn_config.go | 7 +- .../changeset/cs_update_rmn_config_test.go | 1 + deployment/ccip/changeset/test_assertions.go | 4 +- deployment/ccip/changeset/test_environment.go | 12 +- deployment/ccip/changeset/view_test.go | 1 + .../common/changeset/internal/mcms_test.go | 11 +- deployment/common/changeset/state.go | 96 +----- deployment/common/changeset/test_helpers.go | 8 +- .../transfer_to_mcms_with_timelock_test.go | 12 +- .../common/proposalutils/mcms_helpers.go | 273 ++++++++++++++++++ .../mcms_test_helpers.go | 67 ++--- .../changeset/accept_ownership_test.go | 13 +- .../append_node_capabilities_test.go | 3 +- .../changeset/deploy_forwarder_test.go | 5 +- .../keystone/changeset/deploy_ocr3_test.go | 3 +- deployment/keystone/changeset/helpers_test.go | 11 +- .../keystone/changeset/update_don_test.go | 3 +- .../update_node_capabilities_test.go | 3 +- .../keystone/changeset/update_nodes_test.go | 3 +- 27 files changed, 375 insertions(+), 225 deletions(-) create mode 100644 deployment/common/proposalutils/mcms_helpers.go rename deployment/common/{changeset => proposalutils}/mcms_test_helpers.go (54%) diff --git a/deployment/ccip/changeset/accept_ownership_test.go b/deployment/ccip/changeset/accept_ownership_test.go index 5580b31a85a..1dbef8e7a0b 100644 --- a/deployment/ccip/changeset/accept_ownership_test.go +++ b/deployment/ccip/changeset/accept_ownership_test.go @@ -9,9 +9,11 @@ import ( "golang.org/x/exp/maps" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" ) func Test_NewAcceptOwnershipChangeset(t *testing.T) { + t.Parallel() e := NewMemoryEnvironment(t) state, err := LoadOnchainState(e.Env) require.NoError(t, err) @@ -20,12 +22,12 @@ func Test_NewAcceptOwnershipChangeset(t *testing.T) { source := allChains[0] dest := allChains[1] - timelockContracts := map[uint64]*commonchangeset.TimelockExecutionContracts{ - source: &commonchangeset.TimelockExecutionContracts{ + timelockContracts := map[uint64]*proposalutils.TimelockExecutionContracts{ + source: &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[source].Timelock, CallProxy: state.Chains[source].CallProxy, }, - dest: &commonchangeset.TimelockExecutionContracts{ + dest: &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[dest].Timelock, CallProxy: state.Chains[dest].CallProxy, }, diff --git a/deployment/ccip/changeset/cs_add_chain_test.go b/deployment/ccip/changeset/cs_add_chain_test.go index b21d7411ce7..96b77f1bd7d 100644 --- a/deployment/ccip/changeset/cs_add_chain_test.go +++ b/deployment/ccip/changeset/cs_add_chain_test.go @@ -1,12 +1,12 @@ package changeset import ( - "math/big" "testing" "time" "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/internal" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" commontypes "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types" @@ -30,6 +30,7 @@ import ( ) func TestAddChainInbound(t *testing.T) { + t.Parallel() // 4 chains where the 4th is added after initial deployment. e := NewMemoryEnvironment(t, WithChains(4), @@ -46,12 +47,7 @@ func TestAddChainInbound(t *testing.T) { require.NoError(t, err) require.NoError(t, e.Env.ExistingAddresses.Merge(newAddresses)) - cfg := commontypes.MCMSWithTimelockConfig{ - Canceller: commonchangeset.SingleGroupMCMS(t), - Bypasser: commonchangeset.SingleGroupMCMS(t), - Proposer: commonchangeset.SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - } + cfg := proposalutils.SingleGroupTimelockConfig(t) e.Env, err = commonchangeset.ApplyChangesets(t, e.Env, nil, []commonchangeset.ChangesetApplication{ { Changeset: commonchangeset.WrapChangeSet(commonchangeset.DeployLinkToken), @@ -152,7 +148,7 @@ func TestAddChainInbound(t *testing.T) { } // transfer ownership to timelock - _, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*commonchangeset.TimelockExecutionContracts{ + _, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*proposalutils.TimelockExecutionContracts{ initialDeploy[0]: { Timelock: state.Chains[initialDeploy[0]].Timelock, CallProxy: state.Chains[initialDeploy[0]].CallProxy, @@ -194,7 +190,7 @@ func TestAddChainInbound(t *testing.T) { nodeIDs = append(nodeIDs, node.NodeID) } - _, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*commonchangeset.TimelockExecutionContracts{ + _, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*proposalutils.TimelockExecutionContracts{ e.HomeChainSel: { Timelock: state.Chains[e.HomeChainSel].Timelock, CallProxy: state.Chains[e.HomeChainSel].CallProxy, diff --git a/deployment/ccip/changeset/cs_add_lane_test.go b/deployment/ccip/changeset/cs_add_lane_test.go index 7f1374a1725..5c324c975ef 100644 --- a/deployment/ccip/changeset/cs_add_lane_test.go +++ b/deployment/ccip/changeset/cs_add_lane_test.go @@ -16,6 +16,7 @@ import ( ) func TestAddLanesWithTestRouter(t *testing.T) { + t.Parallel() e := NewMemoryEnvironment(t) // Here we have CR + nodes set up, but no CCIP contracts deployed. state, err := LoadOnchainState(e.Env) diff --git a/deployment/ccip/changeset/cs_ccip_home_test.go b/deployment/ccip/changeset/cs_ccip_home_test.go index 92784551957..47f262d3f83 100644 --- a/deployment/ccip/changeset/cs_ccip_home_test.go +++ b/deployment/ccip/changeset/cs_ccip_home_test.go @@ -27,7 +27,7 @@ import ( func TestActiveCandidate(t *testing.T) { t.Skipf("to be enabled after latest cl-ccip is compatible") - + t.Parallel() tenv := NewMemoryEnvironment(t, WithChains(3), WithNodes(5)) @@ -86,9 +86,9 @@ func TestActiveCandidate(t *testing.T) { ConfirmExecWithSeqNrsForAll(t, e, state, expectedSeqNumExec, startBlocks) // compose the transfer ownership and accept ownership changesets - timelockContracts := make(map[uint64]*commonchangeset.TimelockExecutionContracts) + timelockContracts := make(map[uint64]*proposalutils.TimelockExecutionContracts) for _, chain := range allChains { - timelockContracts[chain] = &commonchangeset.TimelockExecutionContracts{ + timelockContracts[chain] = &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[chain].Timelock, CallProxy: state.Chains[chain].CallProxy, } @@ -176,8 +176,8 @@ func TestActiveCandidate(t *testing.T) { Batch: setCommitCandidateOp, }}, "set new candidates on commit plugin", 0) require.NoError(t, err) - setCommitCandidateSigned := commonchangeset.SignProposal(t, e, setCommitCandidateProposal) - commonchangeset.ExecuteProposal(t, e, setCommitCandidateSigned, &commonchangeset.TimelockExecutionContracts{ + setCommitCandidateSigned := proposalutils.SignProposal(t, e, setCommitCandidateProposal) + proposalutils.ExecuteProposal(t, e, setCommitCandidateSigned, &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[tenv.HomeChainSel].Timelock, CallProxy: state.Chains[tenv.HomeChainSel].CallProxy, }, tenv.HomeChainSel) @@ -197,8 +197,8 @@ func TestActiveCandidate(t *testing.T) { Batch: setExecCandidateOp, }}, "set new candidates on commit and exec plugins", 0) require.NoError(t, err) - setExecCandidateSigned := commonchangeset.SignProposal(t, e, setExecCandidateProposal) - commonchangeset.ExecuteProposal(t, e, setExecCandidateSigned, &commonchangeset.TimelockExecutionContracts{ + setExecCandidateSigned := proposalutils.SignProposal(t, e, setExecCandidateProposal) + proposalutils.ExecuteProposal(t, e, setExecCandidateSigned, &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[tenv.HomeChainSel].Timelock, CallProxy: state.Chains[tenv.HomeChainSel].CallProxy, }, tenv.HomeChainSel) @@ -234,8 +234,8 @@ func TestActiveCandidate(t *testing.T) { Batch: promoteOps, }}, "promote candidates and revoke actives", 0) require.NoError(t, err) - promoteSigned := commonchangeset.SignProposal(t, e, promoteProposal) - commonchangeset.ExecuteProposal(t, e, promoteSigned, &commonchangeset.TimelockExecutionContracts{ + promoteSigned := proposalutils.SignProposal(t, e, promoteProposal) + proposalutils.ExecuteProposal(t, e, promoteSigned, &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[tenv.HomeChainSel].Timelock, CallProxy: state.Chains[tenv.HomeChainSel].CallProxy, }, tenv.HomeChainSel) @@ -298,7 +298,7 @@ func Test_PromoteCandidate(t *testing.T) { if tc.mcmsEnabled { // Transfer ownership to timelock so that we can promote the zero digest later down the line. - _, err = commonchangeset.ApplyChangesets(t, tenv.Env, map[uint64]*commonchangeset.TimelockExecutionContracts{ + _, err = commonchangeset.ApplyChangesets(t, tenv.Env, map[uint64]*proposalutils.TimelockExecutionContracts{ source: { Timelock: state.Chains[source].Timelock, CallProxy: state.Chains[source].CallProxy, @@ -345,7 +345,7 @@ func Test_PromoteCandidate(t *testing.T) { MinDelay: 0, } } - _, err = commonchangeset.ApplyChangesets(t, tenv.Env, map[uint64]*commonchangeset.TimelockExecutionContracts{ + _, err = commonchangeset.ApplyChangesets(t, tenv.Env, map[uint64]*proposalutils.TimelockExecutionContracts{ tenv.HomeChainSel: { Timelock: state.Chains[tenv.HomeChainSel].Timelock, CallProxy: state.Chains[tenv.HomeChainSel].CallProxy, diff --git a/deployment/ccip/changeset/cs_deploy_chain_test.go b/deployment/ccip/changeset/cs_deploy_chain_test.go index fbf9c881138..9e1a581112d 100644 --- a/deployment/ccip/changeset/cs_deploy_chain_test.go +++ b/deployment/ccip/changeset/cs_deploy_chain_test.go @@ -3,7 +3,6 @@ package changeset import ( "encoding/json" "fmt" - "math/big" "testing" "github.com/stretchr/testify/require" @@ -11,12 +10,14 @@ import ( "github.com/smartcontractkit/chainlink/deployment" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" commontypes "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/environment/memory" "github.com/smartcontractkit/chainlink/v2/core/logger" ) func TestDeployChainContractsChangeset(t *testing.T) { + t.Parallel() lggr := logger.TestLogger(t) e := memory.NewMemoryEnvironment(t, lggr, zapcore.InfoLevel, memory.MemoryEnvironmentConfig{ Bootstraps: 1, @@ -30,12 +31,7 @@ func TestDeployChainContractsChangeset(t *testing.T) { p2pIds := nodes.NonBootstraps().PeerIDs() cfg := make(map[uint64]commontypes.MCMSWithTimelockConfig) for _, chain := range e.AllChainSelectors() { - cfg[chain] = commontypes.MCMSWithTimelockConfig{ - Canceller: commonchangeset.SingleGroupMCMS(t), - Bypasser: commonchangeset.SingleGroupMCMS(t), - Proposer: commonchangeset.SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - } + cfg[chain] = proposalutils.SingleGroupTimelockConfig(t) } e, err = commonchangeset.ApplyChangesets(t, e, nil, []commonchangeset.ChangesetApplication{ { @@ -98,6 +94,7 @@ func TestDeployChainContractsChangeset(t *testing.T) { } func TestDeployCCIPContracts(t *testing.T) { + t.Parallel() e := NewMemoryEnvironment(t) // Deploy all the CCIP contracts. state, err := LoadOnchainState(e.Env) diff --git a/deployment/ccip/changeset/cs_home_chain_test.go b/deployment/ccip/changeset/cs_home_chain_test.go index a06161f7086..eb620691db0 100644 --- a/deployment/ccip/changeset/cs_home_chain_test.go +++ b/deployment/ccip/changeset/cs_home_chain_test.go @@ -13,6 +13,7 @@ import ( ) func TestDeployHomeChain(t *testing.T) { + t.Parallel() lggr := logger.TestLogger(t) e := memory.NewMemoryEnvironment(t, lggr, zapcore.InfoLevel, memory.MemoryEnvironmentConfig{ Bootstraps: 1, diff --git a/deployment/ccip/changeset/cs_initial_add_chain_test.go b/deployment/ccip/changeset/cs_initial_add_chain_test.go index c1404eb7123..f344068f11b 100644 --- a/deployment/ccip/changeset/cs_initial_add_chain_test.go +++ b/deployment/ccip/changeset/cs_initial_add_chain_test.go @@ -9,10 +9,12 @@ import ( "github.com/stretchr/testify/require" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/router" ) func TestInitialAddChainAppliedTwice(t *testing.T) { + t.Parallel() // This already applies the initial add chain changeset. e := NewMemoryEnvironment(t) @@ -24,10 +26,10 @@ func TestInitialAddChainAppliedTwice(t *testing.T) { allChains := e.Env.AllChainSelectors() tokenConfig := NewTestTokenConfig(state.Chains[e.FeedChainSel].USDFeeds) chainConfigs := make(map[uint64]CCIPOCRParams) - timelockContractsPerChain := make(map[uint64]*commonchangeset.TimelockExecutionContracts) + timelockContractsPerChain := make(map[uint64]*proposalutils.TimelockExecutionContracts) for _, chain := range allChains { - timelockContractsPerChain[chain] = &commonchangeset.TimelockExecutionContracts{ + timelockContractsPerChain[chain] = &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[chain].Timelock, CallProxy: state.Chains[chain].CallProxy, } diff --git a/deployment/ccip/changeset/cs_jobspec_test.go b/deployment/ccip/changeset/cs_jobspec_test.go index 21e80e85aa2..a0445b0d5ee 100644 --- a/deployment/ccip/changeset/cs_jobspec_test.go +++ b/deployment/ccip/changeset/cs_jobspec_test.go @@ -13,6 +13,7 @@ import ( ) func TestJobSpecChangeset(t *testing.T) { + t.Parallel() lggr := logger.TestLogger(t) e := memory.NewMemoryEnvironment(t, lggr, zapcore.InfoLevel, memory.MemoryEnvironmentConfig{ Chains: 1, diff --git a/deployment/ccip/changeset/cs_update_rmn_config.go b/deployment/ccip/changeset/cs_update_rmn_config.go index 25ae8308eb5..42eace928c3 100644 --- a/deployment/ccip/changeset/cs_update_rmn_config.go +++ b/deployment/ccip/changeset/cs_update_rmn_config.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/ccip-owner-contracts/pkg/proposal/mcms" "github.com/smartcontractkit/ccip-owner-contracts/pkg/proposal/timelock" "github.com/smartcontractkit/chainlink/deployment" - commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/rmn_home" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/rmn_remote" @@ -304,10 +303,10 @@ func NewPromoteCandidateConfigChangeset(e deployment.Environment, config Promote }, nil } -func buildTimelockPerChain(e deployment.Environment, state CCIPOnChainState) map[uint64]*commonchangeset.TimelockExecutionContracts { - timelocksPerChain := make(map[uint64]*commonchangeset.TimelockExecutionContracts) +func buildTimelockPerChain(e deployment.Environment, state CCIPOnChainState) map[uint64]*proposalutils.TimelockExecutionContracts { + timelocksPerChain := make(map[uint64]*proposalutils.TimelockExecutionContracts) for _, chain := range e.Chains { - timelocksPerChain[chain.Selector] = &commonchangeset.TimelockExecutionContracts{ + timelocksPerChain[chain.Selector] = &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[chain.Selector].Timelock, CallProxy: state.Chains[chain.Selector].CallProxy, } diff --git a/deployment/ccip/changeset/cs_update_rmn_config_test.go b/deployment/ccip/changeset/cs_update_rmn_config_test.go index 3ec309182aa..bab70f68fb5 100644 --- a/deployment/ccip/changeset/cs_update_rmn_config_test.go +++ b/deployment/ccip/changeset/cs_update_rmn_config_test.go @@ -56,6 +56,7 @@ func TestUpdateRMNConfig(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + t.Parallel() updateRMNConfig(t, tc) }) } diff --git a/deployment/ccip/changeset/test_assertions.go b/deployment/ccip/changeset/test_assertions.go index c0b510acc07..a114e52b361 100644 --- a/deployment/ccip/changeset/test_assertions.go +++ b/deployment/ccip/changeset/test_assertions.go @@ -221,8 +221,8 @@ func ConfirmCommitForAllWithExpectedSeqNums( return false } }, - 3*time.Minute, - 1*time.Second, + tests.WaitTimeout(t), + 2*time.Second, "all commitments did not confirm", ) } diff --git a/deployment/ccip/changeset/test_environment.go b/deployment/ccip/changeset/test_environment.go index ede078254c2..0efa44d108c 100644 --- a/deployment/ccip/changeset/test_environment.go +++ b/deployment/ccip/changeset/test_environment.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink/deployment" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" commontypes "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/environment/memory" ) @@ -299,12 +300,7 @@ func NewEnvironmentWithJobsAndContracts(t *testing.T, tc *TestConfigs, tEnv Test mcmsCfg := make(map[uint64]commontypes.MCMSWithTimelockConfig) for _, c := range e.Env.AllChainSelectors() { - mcmsCfg[c] = commontypes.MCMSWithTimelockConfig{ - Canceller: commonchangeset.SingleGroupMCMS(t), - Bypasser: commonchangeset.SingleGroupMCMS(t), - Proposer: commonchangeset.SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - } + mcmsCfg[c] = proposalutils.SingleGroupTimelockConfig(t) } var ( usdcChains []uint64 @@ -382,9 +378,9 @@ func NewEnvironmentWithJobsAndContracts(t *testing.T, tc *TestConfigs, tEnv Test } // Build the per chain config. chainConfigs := make(map[uint64]CCIPOCRParams) - timelockContractsPerChain := make(map[uint64]*commonchangeset.TimelockExecutionContracts) + timelockContractsPerChain := make(map[uint64]*proposalutils.TimelockExecutionContracts) for _, chain := range allChains { - timelockContractsPerChain[chain] = &commonchangeset.TimelockExecutionContracts{ + timelockContractsPerChain[chain] = &proposalutils.TimelockExecutionContracts{ Timelock: state.Chains[chain].Timelock, CallProxy: state.Chains[chain].CallProxy, } diff --git a/deployment/ccip/changeset/view_test.go b/deployment/ccip/changeset/view_test.go index 11430bfbddf..35193979849 100644 --- a/deployment/ccip/changeset/view_test.go +++ b/deployment/ccip/changeset/view_test.go @@ -7,6 +7,7 @@ import ( ) func TestSmokeView(t *testing.T) { + t.Parallel() tenv := NewMemoryEnvironment(t, WithChains(3)) _, err := ViewCCIP(tenv.Env) require.NoError(t, err) diff --git a/deployment/common/changeset/internal/mcms_test.go b/deployment/common/changeset/internal/mcms_test.go index 10fb1d980de..8446aab4bfe 100644 --- a/deployment/common/changeset/internal/mcms_test.go +++ b/deployment/common/changeset/internal/mcms_test.go @@ -2,7 +2,6 @@ package internal_test import ( "encoding/json" - "math/big" "testing" chainsel "github.com/smartcontractkit/chain-selectors" @@ -11,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/deployment" "github.com/smartcontractkit/chainlink/deployment/common/changeset" "github.com/smartcontractkit/chainlink/deployment/common/changeset/internal" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/environment/memory" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -23,7 +23,7 @@ func TestDeployMCMSWithConfig(t *testing.T) { }) ab := deployment.NewMemoryAddressBook() _, err := internal.DeployMCMSWithConfig(types.ProposerManyChainMultisig, - lggr, chains[chainsel.TEST_90000001.Selector], ab, changeset.SingleGroupMCMS(t)) + lggr, chains[chainsel.TEST_90000001.Selector], ab, proposalutils.SingleGroupMCMS(t)) require.NoError(t, err) } @@ -35,12 +35,7 @@ func TestDeployMCMSWithTimelockContracts(t *testing.T) { ab := deployment.NewMemoryAddressBook() _, err := internal.DeployMCMSWithTimelockContracts(lggr, chains[chainsel.TEST_90000001.Selector], - ab, types.MCMSWithTimelockConfig{ - Canceller: changeset.SingleGroupMCMS(t), - Bypasser: changeset.SingleGroupMCMS(t), - Proposer: changeset.SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - }) + ab, proposalutils.SingleGroupTimelockConfig(t)) require.NoError(t, err) addresses, err := ab.AddressesForChain(chainsel.TEST_90000001.Selector) require.NoError(t, err) diff --git a/deployment/common/changeset/state.go b/deployment/common/changeset/state.go index a580c13b40b..c45fe6ba9b5 100644 --- a/deployment/common/changeset/state.go +++ b/deployment/common/changeset/state.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" - owner_helpers "github.com/smartcontractkit/ccip-owner-contracts/pkg/gethwrappers" "github.com/smartcontractkit/chainlink/deployment" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/common/view/v1_0" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/link_token_interface" @@ -19,32 +19,18 @@ import ( // It is public for use in product specific packages. // Either all fields are nil or all fields are non-nil. type MCMSWithTimelockState struct { - CancellerMcm *owner_helpers.ManyChainMultiSig - BypasserMcm *owner_helpers.ManyChainMultiSig - ProposerMcm *owner_helpers.ManyChainMultiSig - Timelock *owner_helpers.RBACTimelock - CallProxy *owner_helpers.CallProxy + *proposalutils.MCMSWithTimelockContracts } -// Validate checks that all fields are non-nil, ensuring it's ready -// for use generating views or interactions. -func (state MCMSWithTimelockState) Validate() error { - if state.Timelock == nil { - return errors.New("timelock not found") - } - if state.CancellerMcm == nil { - return errors.New("canceller not found") - } - if state.ProposerMcm == nil { - return errors.New("proposer not found") - } - if state.BypasserMcm == nil { - return errors.New("bypasser not found") - } - if state.CallProxy == nil { - return errors.New("call proxy not found") +func MaybeLoadMCMSWithTimelockState(chain deployment.Chain, addresses map[string]deployment.TypeAndVersion) (*MCMSWithTimelockState, error) { + contracts, err := proposalutils.MaybeLoadMCMSWithTimelockContracts(chain, addresses) + if err != nil { + return nil, err } - return nil + + return &MCMSWithTimelockState{ + MCMSWithTimelockContracts: contracts, + }, nil } func (state MCMSWithTimelockState) GenerateMCMSWithTimelockView() (v1_0.MCMSWithTimelockView, error) { @@ -80,68 +66,6 @@ func (state MCMSWithTimelockState) GenerateMCMSWithTimelockView() (v1_0.MCMSWith }, nil } -// MaybeLoadMCMSWithTimelockState looks for the addresses corresponding to -// contracts deployed with DeployMCMSWithTimelock and loads them into a -// MCMSWithTimelockState struct. If none of the contracts are found, the state struct will be nil. -// An error indicates: -// - Found but was unable to load a contract -// - It only found part of the bundle of contracts -// - If found more than one instance of a contract (we expect one bundle in the given addresses) -func MaybeLoadMCMSWithTimelockState(chain deployment.Chain, addresses map[string]deployment.TypeAndVersion) (*MCMSWithTimelockState, error) { - state := MCMSWithTimelockState{} - // We expect one of each contract on the chain. - timelock := deployment.NewTypeAndVersion(types.RBACTimelock, deployment.Version1_0_0) - callProxy := deployment.NewTypeAndVersion(types.CallProxy, deployment.Version1_0_0) - proposer := deployment.NewTypeAndVersion(types.ProposerManyChainMultisig, deployment.Version1_0_0) - canceller := deployment.NewTypeAndVersion(types.CancellerManyChainMultisig, deployment.Version1_0_0) - bypasser := deployment.NewTypeAndVersion(types.BypasserManyChainMultisig, deployment.Version1_0_0) - - // Ensure we either have the bundle or not. - _, err := deployment.AddressesContainBundle(addresses, - map[deployment.TypeAndVersion]struct{}{ - timelock: {}, proposer: {}, canceller: {}, bypasser: {}, callProxy: {}, - }) - if err != nil { - return nil, fmt.Errorf("unable to check MCMS contracts on chain %s error: %w", chain.Name(), err) - } - - for address, tvStr := range addresses { - switch tvStr { - case timelock: - tl, err := owner_helpers.NewRBACTimelock(common.HexToAddress(address), chain.Client) - if err != nil { - return nil, err - } - state.Timelock = tl - case callProxy: - cp, err := owner_helpers.NewCallProxy(common.HexToAddress(address), chain.Client) - if err != nil { - return nil, err - } - state.CallProxy = cp - case proposer: - mcms, err := owner_helpers.NewManyChainMultiSig(common.HexToAddress(address), chain.Client) - if err != nil { - return nil, err - } - state.ProposerMcm = mcms - case bypasser: - mcms, err := owner_helpers.NewManyChainMultiSig(common.HexToAddress(address), chain.Client) - if err != nil { - return nil, err - } - state.BypasserMcm = mcms - case canceller: - mcms, err := owner_helpers.NewManyChainMultiSig(common.HexToAddress(address), chain.Client) - if err != nil { - return nil, err - } - state.CancellerMcm = mcms - } - } - return &state, nil -} - type LinkTokenState struct { LinkToken *link_token.LinkToken } diff --git a/deployment/common/changeset/test_helpers.go b/deployment/common/changeset/test_helpers.go index 8fce5ea79f2..e92b36e5b55 100644 --- a/deployment/common/changeset/test_helpers.go +++ b/deployment/common/changeset/test_helpers.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext" "github.com/smartcontractkit/chainlink/deployment" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" ) type ChangesetApplication struct { @@ -32,7 +33,7 @@ func WrapChangeSet[C any](fn deployment.ChangeSet[C]) func(e deployment.Environm } // ApplyChangesets applies the changeset applications to the environment and returns the updated environment. -func ApplyChangesets(t *testing.T, e deployment.Environment, timelockContractsPerChain map[uint64]*TimelockExecutionContracts, changesetApplications []ChangesetApplication) (deployment.Environment, error) { +func ApplyChangesets(t *testing.T, e deployment.Environment, timelockContractsPerChain map[uint64]*proposalutils.TimelockExecutionContracts, changesetApplications []ChangesetApplication) (deployment.Environment, error) { currentEnv := e for i, csa := range changesetApplications { out, err := csa.Changeset(currentEnv, csa.Config) @@ -72,14 +73,14 @@ func ApplyChangesets(t *testing.T, e deployment.Environment, timelockContractsPe chains.Add(uint64(op.ChainIdentifier)) } - signed := SignProposal(t, e, &prop) + signed := proposalutils.SignProposal(t, e, &prop) for _, sel := range chains.ToSlice() { timelockContracts, ok := timelockContractsPerChain[sel] if !ok || timelockContracts == nil { return deployment.Environment{}, fmt.Errorf("timelock contracts not found for chain %d", sel) } - ExecuteProposal(t, e, signed, timelockContracts, sel) + proposalutils.ExecuteProposal(t, e, signed, timelockContracts, sel) } } } @@ -91,6 +92,7 @@ func ApplyChangesets(t *testing.T, e deployment.Environment, timelockContractsPe NodeIDs: e.NodeIDs, Offchain: e.Offchain, OCRSecrets: e.OCRSecrets, + GetContext: e.GetContext, } } return currentEnv, nil diff --git a/deployment/common/changeset/transfer_to_mcms_with_timelock_test.go b/deployment/common/changeset/transfer_to_mcms_with_timelock_test.go index 6c68924b35e..40cef99a54f 100644 --- a/deployment/common/changeset/transfer_to_mcms_with_timelock_test.go +++ b/deployment/common/changeset/transfer_to_mcms_with_timelock_test.go @@ -6,8 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" - "math/big" - + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/environment/memory" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -28,12 +27,7 @@ func TestTransferToMCMSWithTimelock(t *testing.T) { { Changeset: WrapChangeSet(DeployMCMSWithTimelock), Config: map[uint64]types.MCMSWithTimelockConfig{ - chain1: { - Canceller: SingleGroupMCMS(t), - Bypasser: SingleGroupMCMS(t), - Proposer: SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - }, + chain1: proposalutils.SingleGroupTimelockConfig(t), }, }, }) @@ -44,7 +38,7 @@ func TestTransferToMCMSWithTimelock(t *testing.T) { require.NoError(t, err) link, err := MaybeLoadLinkTokenState(e.Chains[chain1], addrs) require.NoError(t, err) - e, err = ApplyChangesets(t, e, map[uint64]*TimelockExecutionContracts{ + e, err = ApplyChangesets(t, e, map[uint64]*proposalutils.TimelockExecutionContracts{ chain1: { Timelock: state.Timelock, CallProxy: state.CallProxy, diff --git a/deployment/common/proposalutils/mcms_helpers.go b/deployment/common/proposalutils/mcms_helpers.go new file mode 100644 index 00000000000..4a7540761ee --- /dev/null +++ b/deployment/common/proposalutils/mcms_helpers.go @@ -0,0 +1,273 @@ +package proposalutils + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + owner_helpers "github.com/smartcontractkit/ccip-owner-contracts/pkg/gethwrappers" + "github.com/smartcontractkit/ccip-owner-contracts/pkg/proposal/mcms" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/deployment" + "github.com/smartcontractkit/chainlink/deployment/common/types" +) + +// TimelockExecutionContracts is a helper struct for executing timelock proposals. it contains +// the timelock and call proxy contracts. +type TimelockExecutionContracts struct { + Timelock *owner_helpers.RBACTimelock + CallProxy *owner_helpers.CallProxy +} + +// NewTimelockExecutionContracts creates a new TimelockExecutionContracts struct. +// If there are multiple timelocks or call proxy on the chain, an error is returned. +// If there is a missing timelocks or call proxy on the chain, an error is returned. +func NewTimelockExecutionContracts(env deployment.Environment, chainSelector uint64) (*TimelockExecutionContracts, error) { + addrTypeVer, err := env.ExistingAddresses.AddressesForChain(chainSelector) + if err != nil { + return nil, fmt.Errorf("error getting addresses for chain: %w", err) + } + var timelock *owner_helpers.RBACTimelock + var callProxy *owner_helpers.CallProxy + for addr, tv := range addrTypeVer { + if tv.Type == types.RBACTimelock { + if timelock != nil { + return nil, fmt.Errorf("multiple timelocks found on chain %d", chainSelector) + } + var err error + timelock, err = owner_helpers.NewRBACTimelock(common.HexToAddress(addr), env.Chains[chainSelector].Client) + if err != nil { + return nil, fmt.Errorf("error creating timelock: %w", err) + } + } + if tv.Type == types.CallProxy { + if callProxy != nil { + return nil, fmt.Errorf("multiple call proxies found on chain %d", chainSelector) + } + var err error + callProxy, err = owner_helpers.NewCallProxy(common.HexToAddress(addr), env.Chains[chainSelector].Client) + if err != nil { + return nil, fmt.Errorf("error creating call proxy: %w", err) + } + } + } + if timelock == nil || callProxy == nil { + return nil, fmt.Errorf("missing timelock (%T) or call proxy(%T) on chain %d", timelock == nil, callProxy == nil, chainSelector) + } + return &TimelockExecutionContracts{ + Timelock: timelock, + CallProxy: callProxy, + }, nil +} + +type RunTimelockExecutorConfig struct { + Executor *mcms.Executor + TimelockContracts *TimelockExecutionContracts + ChainSelector uint64 + // BlockStart is optional. It filter the timelock scheduled events. + // If not provided, the executor assumes that the operations have not been executed yet + // executes all the operations for the given chain. + BlockStart *uint64 + BlockEnd *uint64 +} + +func (cfg RunTimelockExecutorConfig) Validate() error { + if cfg.Executor == nil { + return fmt.Errorf("executor is nil") + } + if cfg.TimelockContracts == nil { + return fmt.Errorf("timelock contracts is nil") + } + if cfg.ChainSelector == 0 { + return fmt.Errorf("chain selector is 0") + } + if cfg.BlockStart != nil && cfg.BlockEnd == nil { + if *cfg.BlockStart > *cfg.BlockEnd { + return fmt.Errorf("block start is greater than block end") + } + } + if cfg.BlockStart == nil && cfg.BlockEnd != nil { + return fmt.Errorf("block start must not be nil when block end is not nil") + } + + if len(cfg.Executor.Operations[mcms.ChainIdentifier(cfg.ChainSelector)]) == 0 { + return fmt.Errorf("no operations for chain %d", cfg.ChainSelector) + } + return nil +} + +// RunTimelockExecutor runs the scheduled operations for the given chain. +// If the block start is not provided, it assumes that the operations have not been scheduled yet +// and executes all the operations for the given chain. +// It is an error if there are no operations for the given chain. +func RunTimelockExecutor(env deployment.Environment, cfg RunTimelockExecutorConfig) error { + // TODO: This sort of helper probably should move to the MCMS lib. + // Execute all the transactions in the proposal which are for this chain. + if err := cfg.Validate(); err != nil { + return fmt.Errorf("error validating config: %w", err) + } + for _, chainOp := range cfg.Executor.Operations[mcms.ChainIdentifier(cfg.ChainSelector)] { + for idx, op := range cfg.Executor.ChainAgnosticOps { + start := cfg.BlockStart + end := cfg.BlockEnd + if bytes.Equal(op.Data, chainOp.Data) && op.To == chainOp.To { + if start == nil { + opTx, err2 := cfg.Executor.ExecuteOnChain(env.Chains[cfg.ChainSelector].Client, env.Chains[cfg.ChainSelector].DeployerKey, idx) + if err2 != nil { + return fmt.Errorf("error executing on chain: %w", err2) + } + block, err2 := env.Chains[cfg.ChainSelector].Confirm(opTx) + if err2 != nil { + return fmt.Errorf("error confirming on chain: %w", err2) + } + start = &block + end = &block + } + + it, err2 := cfg.TimelockContracts.Timelock.FilterCallScheduled(&bind.FilterOpts{ + Start: *start, + End: end, + Context: env.GetContext(), + }, nil, nil) + if err2 != nil { + return fmt.Errorf("error filtering call scheduled: %w", err2) + } + var calls []owner_helpers.RBACTimelockCall + var pred, salt [32]byte + for it.Next() { + // Note these are the same for the whole batch, can overwrite + pred = it.Event.Predecessor + salt = it.Event.Salt + verboseDebug(env.Logger, it.Event) + env.Logger.Info("scheduled", "event", it.Event) + calls = append(calls, owner_helpers.RBACTimelockCall{ + Target: it.Event.Target, + Data: it.Event.Data, + Value: it.Event.Value, + }) + } + + timelockExecutorProxy, err := owner_helpers.NewRBACTimelock(cfg.TimelockContracts.CallProxy.Address(), env.Chains[cfg.ChainSelector].Client) + if err != nil { + return fmt.Errorf("error creating timelock executor proxy: %w", err) + } + tx, err := timelockExecutorProxy.ExecuteBatch( + env.Chains[cfg.ChainSelector].DeployerKey, calls, pred, salt) + if err != nil { + return fmt.Errorf("error executing batch: %w", err) + } + _, err = env.Chains[cfg.ChainSelector].Confirm(tx) + if err != nil { + return fmt.Errorf("error confirming batch: %w", err) + } + } + } + } + return nil +} + +func verboseDebug(lggr logger.Logger, event *owner_helpers.RBACTimelockCallScheduled) { + b, err := json.Marshal(event) + if err != nil { + panic(err) + } + lggr.Debug("scheduled", "event", string(b)) +} + +// MCMSWithTimelockContracts holds the Go bindings +// for a MCMSWithTimelock contract deployment. +// It is public for use in product specific packages. +// Either all fields are nil or all fields are non-nil. +type MCMSWithTimelockContracts struct { + CancellerMcm *owner_helpers.ManyChainMultiSig + BypasserMcm *owner_helpers.ManyChainMultiSig + ProposerMcm *owner_helpers.ManyChainMultiSig + Timelock *owner_helpers.RBACTimelock + CallProxy *owner_helpers.CallProxy +} + +// Validate checks that all fields are non-nil, ensuring it's ready +// for use generating views or interactions. +func (state MCMSWithTimelockContracts) Validate() error { + if state.Timelock == nil { + return errors.New("timelock not found") + } + if state.CancellerMcm == nil { + return errors.New("canceller not found") + } + if state.ProposerMcm == nil { + return errors.New("proposer not found") + } + if state.BypasserMcm == nil { + return errors.New("bypasser not found") + } + if state.CallProxy == nil { + return errors.New("call proxy not found") + } + return nil +} + +// MaybeLoadMCMSWithTimelockContracts looks for the addresses corresponding to +// contracts deployed with DeployMCMSWithTimelock and loads them into a +// MCMSWithTimelockState struct. If none of the contracts are found, the state struct will be nil. +// An error indicates: +// - Found but was unable to load a contract +// - It only found part of the bundle of contracts +// - If found more than one instance of a contract (we expect one bundle in the given addresses) +func MaybeLoadMCMSWithTimelockContracts(chain deployment.Chain, addresses map[string]deployment.TypeAndVersion) (*MCMSWithTimelockContracts, error) { + state := MCMSWithTimelockContracts{} + // We expect one of each contract on the chain. + timelock := deployment.NewTypeAndVersion(types.RBACTimelock, deployment.Version1_0_0) + callProxy := deployment.NewTypeAndVersion(types.CallProxy, deployment.Version1_0_0) + proposer := deployment.NewTypeAndVersion(types.ProposerManyChainMultisig, deployment.Version1_0_0) + canceller := deployment.NewTypeAndVersion(types.CancellerManyChainMultisig, deployment.Version1_0_0) + bypasser := deployment.NewTypeAndVersion(types.BypasserManyChainMultisig, deployment.Version1_0_0) + + // Ensure we either have the bundle or not. + _, err := deployment.AddressesContainBundle(addresses, + map[deployment.TypeAndVersion]struct{}{ + timelock: {}, proposer: {}, canceller: {}, bypasser: {}, callProxy: {}, + }) + if err != nil { + return nil, fmt.Errorf("unable to check MCMS contracts on chain %s error: %w", chain.Name(), err) + } + + for address, tvStr := range addresses { + switch tvStr { + case timelock: + tl, err := owner_helpers.NewRBACTimelock(common.HexToAddress(address), chain.Client) + if err != nil { + return nil, err + } + state.Timelock = tl + case callProxy: + cp, err := owner_helpers.NewCallProxy(common.HexToAddress(address), chain.Client) + if err != nil { + return nil, err + } + state.CallProxy = cp + case proposer: + mcms, err := owner_helpers.NewManyChainMultiSig(common.HexToAddress(address), chain.Client) + if err != nil { + return nil, err + } + state.ProposerMcm = mcms + case bypasser: + mcms, err := owner_helpers.NewManyChainMultiSig(common.HexToAddress(address), chain.Client) + if err != nil { + return nil, err + } + state.BypasserMcm = mcms + case canceller: + mcms, err := owner_helpers.NewManyChainMultiSig(common.HexToAddress(address), chain.Client) + if err != nil { + return nil, err + } + state.CancellerMcm = mcms + } + } + return &state, nil +} diff --git a/deployment/common/changeset/mcms_test_helpers.go b/deployment/common/proposalutils/mcms_test_helpers.go similarity index 54% rename from deployment/common/changeset/mcms_test_helpers.go rename to deployment/common/proposalutils/mcms_test_helpers.go index ffa99114d74..610fe84f34c 100644 --- a/deployment/common/changeset/mcms_test_helpers.go +++ b/deployment/common/proposalutils/mcms_test_helpers.go @@ -1,22 +1,21 @@ -package changeset +package proposalutils import ( - "bytes" - "context" "crypto/ecdsa" + "math/big" "testing" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/smartcontractkit/ccip-owner-contracts/pkg/config" - owner_helpers "github.com/smartcontractkit/ccip-owner-contracts/pkg/gethwrappers" "github.com/smartcontractkit/ccip-owner-contracts/pkg/proposal/mcms" "github.com/smartcontractkit/ccip-owner-contracts/pkg/proposal/timelock" chainsel "github.com/smartcontractkit/chain-selectors" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/deployment" + commontypes "github.com/smartcontractkit/chainlink/deployment/common/types" + // "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" ) var ( @@ -25,13 +24,6 @@ var ( TestXXXMCMSSigner *ecdsa.PrivateKey ) -// TimelockExecutionContracts is a helper struct for executing timelock proposals. it contains -// the timelock and call proxy contracts. -type TimelockExecutionContracts struct { - Timelock *owner_helpers.RBACTimelock - CallProxy *owner_helpers.CallProxy -} - func init() { key, err := crypto.GenerateKey() if err != nil { @@ -79,45 +71,22 @@ func ExecuteProposal(t *testing.T, env deployment.Environment, executor *mcms.Ex if err2 != nil { require.NoError(t, deployment.MaybeDataErr(err2)) } + _, err2 = env.Chains[sel].Confirm(tx) require.NoError(t, err2) + cfg := RunTimelockExecutorConfig{ + Executor: executor, + TimelockContracts: timelockContracts, + ChainSelector: sel, + } + require.NoError(t, RunTimelockExecutor(env, cfg)) +} - // TODO: This sort of helper probably should move to the MCMS lib. - // Execute all the transactions in the proposal which are for this chain. - for _, chainOp := range executor.Operations[mcms.ChainIdentifier(sel)] { - for idx, op := range executor.ChainAgnosticOps { - if bytes.Equal(op.Data, chainOp.Data) && op.To == chainOp.To { - opTx, err3 := executor.ExecuteOnChain(env.Chains[sel].Client, env.Chains[sel].DeployerKey, idx) - require.NoError(t, err3) - block, err3 := env.Chains[sel].Confirm(opTx) - require.NoError(t, err3) - t.Log("executed", chainOp) - it, err3 := timelockContracts.Timelock.FilterCallScheduled(&bind.FilterOpts{ - Start: block, - End: &block, - Context: context.Background(), - }, nil, nil) - require.NoError(t, err3) - var calls []owner_helpers.RBACTimelockCall - var pred, salt [32]byte - for it.Next() { - // Note these are the same for the whole batch, can overwrite - pred = it.Event.Predecessor - salt = it.Event.Salt - t.Log("scheduled", it.Event) - calls = append(calls, owner_helpers.RBACTimelockCall{ - Target: it.Event.Target, - Data: it.Event.Data, - Value: it.Event.Value, - }) - } - timelockExecutorProxy, err := owner_helpers.NewRBACTimelock(timelockContracts.CallProxy.Address(), env.Chains[sel].Client) - tx, err := timelockExecutorProxy.ExecuteBatch( - env.Chains[sel].DeployerKey, calls, pred, salt) - require.NoError(t, err) - _, err = env.Chains[sel].Confirm(tx) - require.NoError(t, err) - } - } +func SingleGroupTimelockConfig(t *testing.T) commontypes.MCMSWithTimelockConfig { + return commontypes.MCMSWithTimelockConfig{ + Canceller: SingleGroupMCMS(t), + Bypasser: SingleGroupMCMS(t), + Proposer: SingleGroupMCMS(t), + TimelockMinDelay: big.NewInt(0), } } diff --git a/deployment/keystone/changeset/accept_ownership_test.go b/deployment/keystone/changeset/accept_ownership_test.go index b2aa1b20194..9e9d29e563a 100644 --- a/deployment/keystone/changeset/accept_ownership_test.go +++ b/deployment/keystone/changeset/accept_ownership_test.go @@ -1,7 +1,6 @@ package changeset_test import ( - "math/big" "testing" "github.com/stretchr/testify/require" @@ -10,6 +9,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/environment/memory" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" @@ -44,12 +44,7 @@ func TestAcceptAllOwnership(t *testing.T) { { Changeset: commonchangeset.WrapChangeSet(commonchangeset.DeployMCMSWithTimelock), Config: map[uint64]types.MCMSWithTimelockConfig{ - registrySel: { - Canceller: commonchangeset.SingleGroupMCMS(t), - Bypasser: commonchangeset.SingleGroupMCMS(t), - Proposer: commonchangeset.SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - }, + registrySel: proposalutils.SingleGroupTimelockConfig(t), }, }, }) @@ -59,8 +54,8 @@ func TestAcceptAllOwnership(t *testing.T) { timelock, err := commonchangeset.MaybeLoadMCMSWithTimelockState(env.Chains[registrySel], addrs) require.NoError(t, err) - _, err = commonchangeset.ApplyChangesets(t, env, map[uint64]*commonchangeset.TimelockExecutionContracts{ - registrySel: &commonchangeset.TimelockExecutionContracts{ + _, err = commonchangeset.ApplyChangesets(t, env, map[uint64]*proposalutils.TimelockExecutionContracts{ + registrySel: &proposalutils.TimelockExecutionContracts{ Timelock: timelock.Timelock, CallProxy: timelock.CallProxy, }, diff --git a/deployment/keystone/changeset/append_node_capabilities_test.go b/deployment/keystone/changeset/append_node_capabilities_test.go index 159500ab5a7..bfc01b309f5 100644 --- a/deployment/keystone/changeset/append_node_capabilities_test.go +++ b/deployment/keystone/changeset/append_node_capabilities_test.go @@ -8,6 +8,7 @@ import ( "golang.org/x/exp/maps" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" @@ -87,7 +88,7 @@ func TestAppendNodeCapabilities(t *testing.T) { // now apply the changeset such that the proposal is signed and execed contracts := te.ContractSets()[te.RegistrySelector] - timelockContracts := map[uint64]*commonchangeset.TimelockExecutionContracts{ + timelockContracts := map[uint64]*proposalutils.TimelockExecutionContracts{ te.RegistrySelector: { Timelock: contracts.Timelock, CallProxy: contracts.CallProxy, diff --git a/deployment/keystone/changeset/deploy_forwarder_test.go b/deployment/keystone/changeset/deploy_forwarder_test.go index dd894fde9d9..e04bac6d264 100644 --- a/deployment/keystone/changeset/deploy_forwarder_test.go +++ b/deployment/keystone/changeset/deploy_forwarder_test.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink/deployment" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/environment/memory" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" ) @@ -116,11 +117,11 @@ func TestConfigureForwarders(t *testing.T) { require.Len(t, csOut.Proposals, nChains) require.Nil(t, csOut.AddressBook) - timelockContracts := make(map[uint64]*commonchangeset.TimelockExecutionContracts) + timelockContracts := make(map[uint64]*proposalutils.TimelockExecutionContracts) for selector, contractSet := range te.ContractSets() { require.NotNil(t, contractSet.Timelock) require.NotNil(t, contractSet.CallProxy) - timelockContracts[selector] = &commonchangeset.TimelockExecutionContracts{ + timelockContracts[selector] = &proposalutils.TimelockExecutionContracts{ Timelock: contractSet.Timelock, CallProxy: contractSet.CallProxy, } diff --git a/deployment/keystone/changeset/deploy_ocr3_test.go b/deployment/keystone/changeset/deploy_ocr3_test.go index 5d02f83500d..7a276886242 100644 --- a/deployment/keystone/changeset/deploy_ocr3_test.go +++ b/deployment/keystone/changeset/deploy_ocr3_test.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/environment/memory" kslib "github.com/smartcontractkit/chainlink/deployment/keystone" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" @@ -118,7 +119,7 @@ func TestConfigureOCR3(t *testing.T) { contracts := te.ContractSets()[te.RegistrySelector] require.NoError(t, err) - var timelockContracts = map[uint64]*commonchangeset.TimelockExecutionContracts{ + var timelockContracts = map[uint64]*proposalutils.TimelockExecutionContracts{ te.RegistrySelector: { Timelock: contracts.Timelock, CallProxy: contracts.CallProxy, diff --git a/deployment/keystone/changeset/helpers_test.go b/deployment/keystone/changeset/helpers_test.go index 4e7553d0b8e..d956db991de 100644 --- a/deployment/keystone/changeset/helpers_test.go +++ b/deployment/keystone/changeset/helpers_test.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "math" - "math/big" "sort" "testing" @@ -21,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink/deployment" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" commontypes "github.com/smartcontractkit/chainlink/deployment/common/types" "github.com/smartcontractkit/chainlink/deployment/environment/memory" "github.com/smartcontractkit/chainlink/deployment/keystone" @@ -258,12 +258,7 @@ func SetupTestEnv(t *testing.T, c TestConfig) TestEnv { timelockCfgs := make(map[uint64]commontypes.MCMSWithTimelockConfig) for sel := range env.Chains { t.Logf("Enabling MCMS on chain %d", sel) - timelockCfgs[sel] = commontypes.MCMSWithTimelockConfig{ - Canceller: commonchangeset.SingleGroupMCMS(t), - Bypasser: commonchangeset.SingleGroupMCMS(t), - Proposer: commonchangeset.SingleGroupMCMS(t), - TimelockMinDelay: big.NewInt(0), - } + timelockCfgs[sel] = proposalutils.SingleGroupTimelockConfig(t) } env, err = commonchangeset.ApplyChangesets(t, env, nil, []commonchangeset.ChangesetApplication{ { @@ -284,7 +279,7 @@ func SetupTestEnv(t *testing.T, c TestConfig) TestEnv { require.NoError(t, mcms.Validate()) // transfer ownership of all contracts to the MCMS - env, err = commonchangeset.ApplyChangesets(t, env, map[uint64]*commonchangeset.TimelockExecutionContracts{sel: {Timelock: mcms.Timelock, CallProxy: mcms.CallProxy}}, []commonchangeset.ChangesetApplication{ + env, err = commonchangeset.ApplyChangesets(t, env, map[uint64]*proposalutils.TimelockExecutionContracts{sel: {Timelock: mcms.Timelock, CallProxy: mcms.CallProxy}}, []commonchangeset.ChangesetApplication{ { Changeset: commonchangeset.WrapChangeSet(kschangeset.AcceptAllOwnershipsProposal), Config: &kschangeset.AcceptAllOwnershipRequest{ diff --git a/deployment/keystone/changeset/update_don_test.go b/deployment/keystone/changeset/update_don_test.go index 18287da6887..64cb41c14e5 100644 --- a/deployment/keystone/changeset/update_don_test.go +++ b/deployment/keystone/changeset/update_don_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset/internal" kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" @@ -118,7 +119,7 @@ func TestUpdateDon(t *testing.T) { // now apply the changeset such that the proposal is signed and execed contracts := te.ContractSets()[te.RegistrySelector] - timelockContracts := map[uint64]*commonchangeset.TimelockExecutionContracts{ + timelockContracts := map[uint64]*proposalutils.TimelockExecutionContracts{ te.RegistrySelector: { Timelock: contracts.Timelock, CallProxy: contracts.CallProxy, diff --git a/deployment/keystone/changeset/update_node_capabilities_test.go b/deployment/keystone/changeset/update_node_capabilities_test.go index cb5588ff3d1..87b49acf614 100644 --- a/deployment/keystone/changeset/update_node_capabilities_test.go +++ b/deployment/keystone/changeset/update_node_capabilities_test.go @@ -8,6 +8,7 @@ import ( "golang.org/x/exp/maps" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" @@ -118,7 +119,7 @@ func TestUpdateNodeCapabilities(t *testing.T) { // now apply the changeset such that the proposal is signed and execed contracts := te.ContractSets()[te.RegistrySelector] - timelockContracts := map[uint64]*commonchangeset.TimelockExecutionContracts{ + timelockContracts := map[uint64]*proposalutils.TimelockExecutionContracts{ te.RegistrySelector: { Timelock: contracts.Timelock, CallProxy: contracts.CallProxy, diff --git a/deployment/keystone/changeset/update_nodes_test.go b/deployment/keystone/changeset/update_nodes_test.go index be3bfb12ee6..31f71cd9603 100644 --- a/deployment/keystone/changeset/update_nodes_test.go +++ b/deployment/keystone/changeset/update_nodes_test.go @@ -9,6 +9,7 @@ import ( "golang.org/x/exp/maps" commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset" + "github.com/smartcontractkit/chainlink/deployment/common/proposalutils" "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" ) @@ -89,7 +90,7 @@ func TestUpdateNodes(t *testing.T) { // now apply the changeset such that the proposal is signed and execed contracts := te.ContractSets()[te.RegistrySelector] - timelockContracts := map[uint64]*commonchangeset.TimelockExecutionContracts{ + timelockContracts := map[uint64]*proposalutils.TimelockExecutionContracts{ te.RegistrySelector: { Timelock: contracts.Timelock, CallProxy: contracts.CallProxy, From 83f7413501593058443687347d0b2078410bcc1d Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Thu, 12 Dec 2024 16:57:18 +0000 Subject: [PATCH 2/5] Refactor workflow registry to use querykeys api (#15638) * temp disable test * wip * fix duplicate event bug - change contract reader poll query from Gte to Gt * event state sync test added and passing with fixes - single event * added methods to create different event types * pre-refactor checkpoint - up to this commit no changes have been made to wf registry - test has been added to confirm (and actually has found bugs) in current wf registry behaviour * refactor part 1: replaced querykey with querykeys - all tests passing * refactor part 2: collapse individual event query into single events query and reduce goroutine and channel usage * refactor part 3 - down to single ticker thread * tidy * enable the event ordering test that was failing in the old workflow syncer * lint remove change to try and resolve flaky eth smoke tests * log fix --- .mockery.yaml | 6 - .../workflows/syncer/workflow_syncer_test.go | 215 +++++++++++- .../workflows/syncer/contract_reader_mock.go | 302 ----------------- core/services/workflows/syncer/heap.go | 63 ---- .../workflows/syncer/workflow_registry.go | 309 +++++------------- .../syncer/workflow_registry_test.go | 234 ------------- 6 files changed, 291 insertions(+), 838 deletions(-) delete mode 100644 core/services/workflows/syncer/contract_reader_mock.go delete mode 100644 core/services/workflows/syncer/heap.go delete mode 100644 core/services/workflows/syncer/workflow_registry_test.go diff --git a/.mockery.yaml b/.mockery.yaml index dd9024cc066..5777ca1da92 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -583,12 +583,6 @@ packages: github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer: interfaces: ORM: - ContractReader: - config: - mockname: "Mock{{ .InterfaceName }}" - filename: contract_reader_mock.go - inpackage: true - dir: "{{ .InterfaceDir }}" Handler: config: mockname: "Mock{{ .InterfaceName }}" diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 3c6ee8a1d04..066e85e839f 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -6,7 +6,9 @@ import ( "encoding/base64" "encoding/hex" "fmt" + rand2 "math/rand/v2" "strings" + "sync" "testing" "time" @@ -31,17 +33,38 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/stretchr/testify/require" + + crypto2 "github.com/ethereum/go-ethereum/crypto" ) type testEvtHandler struct { events []syncer.Event + mux sync.Mutex } func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error { + m.mux.Lock() + defer m.mux.Unlock() m.events = append(m.events, event) return nil } +func (m *testEvtHandler) ClearEvents() { + m.mux.Lock() + defer m.mux.Unlock() + m.events = make([]syncer.Event, 0) +} + +func (m *testEvtHandler) GetEvents() []syncer.Event { + m.mux.Lock() + defer m.mux.Unlock() + + eventsCopy := make([]syncer.Event, len(m.events)) + copy(eventsCopy, m.events) + + return eventsCopy +} + func newTestEvtHandler() *testEvtHandler { return &testEvtHandler{ events: make([]syncer.Event, 0), @@ -68,6 +91,138 @@ func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context, }, nil } +func Test_EventHandlerStateSync(t *testing.T) { + lggr := logger.TestLogger(t) + backendTH := testutils.NewEVMBackendTH(t) + donID := uint32(1) + + eventPollTicker := time.NewTicker(50 * time.Millisecond) + defer eventPollTicker.Stop() + + // Deploy a test workflow_registry + wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) + backendTH.Backend.Commit() + require.NoError(t, err) + + // setup contract state to allow the secrets to be updated + updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) + updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) + + // Create some initial static state + numberWorkflows := 20 + for i := 0; i < numberWorkflows; i++ { + var workflowID [32]byte + _, err = rand.Read((workflowID)[:]) + require.NoError(t, err) + workflow := RegisterWorkflowCMD{ + Name: fmt.Sprintf("test-wf-%d", i), + DonID: donID, + Status: uint8(1), + SecretsURL: "someurl", + } + workflow.ID = workflowID + registerWorkflow(t, backendTH, wfRegistryC, workflow) + } + + testEventHandler := newTestEvtHandler() + loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, testEventHandler) + + // Create the registry + registry := syncer.NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, + wfRegistryAddr.Hex(), + syncer.WorkflowEventPollerConfig{ + QueryCount: 20, + }, + testEventHandler, + loader, + &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, + syncer.WithTicker(eventPollTicker.C), + ) + + servicetest.Run(t, registry) + + require.Eventually(t, func() bool { + numEvents := len(testEventHandler.GetEvents()) + return numEvents == numberWorkflows + }, 5*time.Second, time.Second) + + for _, event := range testEventHandler.GetEvents() { + assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) + } + + testEventHandler.ClearEvents() + + // Create different event types for a number of workflows and confirm that the event handler processes them in order + numberOfEventCycles := 50 + for i := 0; i < numberOfEventCycles; i++ { + var workflowID [32]byte + _, err = rand.Read((workflowID)[:]) + require.NoError(t, err) + workflow := RegisterWorkflowCMD{ + Name: "test-wf-register-event", + DonID: donID, + Status: uint8(1), + SecretsURL: "", + } + workflow.ID = workflowID + + // Generate events of different types with some jitter + registerWorkflow(t, backendTH, wfRegistryC, workflow) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + data := append(backendTH.ContractsOwner.From.Bytes(), []byte(workflow.Name)...) + workflowKey := crypto2.Keccak256Hash(data) + activateWorkflow(t, backendTH, wfRegistryC, workflowKey) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + pauseWorkflow(t, backendTH, wfRegistryC, workflowKey) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + var newWorkflowID [32]byte + _, err = rand.Read((newWorkflowID)[:]) + require.NoError(t, err) + updateWorkflow(t, backendTH, wfRegistryC, workflowKey, newWorkflowID, workflow.BinaryURL+"2", workflow.ConfigURL, workflow.SecretsURL) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + deleteWorkflow(t, backendTH, wfRegistryC, workflowKey) + } + + // Confirm the expected number of events are received in the correct order + require.Eventually(t, func() bool { + events := testEventHandler.GetEvents() + numEvents := len(events) + expectedNumEvents := 5 * numberOfEventCycles + + if numEvents == expectedNumEvents { + // verify the events are the expected types in the expected order + for idx, event := range events { + switch idx % 5 { + case 0: + assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) + case 1: + assert.Equal(t, syncer.WorkflowActivatedEvent, event.GetEventType()) + case 2: + assert.Equal(t, syncer.WorkflowPausedEvent, event.GetEventType()) + case 3: + assert.Equal(t, syncer.WorkflowUpdatedEvent, event.GetEventType()) + case 4: + assert.Equal(t, syncer.WorkflowDeletedEvent, event.GetEventType()) + } + } + return true + } + + return false + }, 50*time.Second, time.Second) +} + func Test_InitialStateSync(t *testing.T) { lggr := logger.TestLogger(t) backendTH := testutils.NewEVMBackendTH(t) @@ -128,10 +283,10 @@ func Test_InitialStateSync(t *testing.T) { servicetest.Run(t, worker) require.Eventually(t, func() bool { - return len(testEventHandler.events) == numberWorkflows + return len(testEventHandler.GetEvents()) == numberWorkflows }, 5*time.Second, time.Second) - for _, event := range testEventHandler.events { + for _, event := range testEventHandler.GetEvents() { assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) } } @@ -497,3 +652,59 @@ func requestForceUpdateSecrets( th.Backend.Commit() th.Backend.Commit() } + +func activateWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, +) { + t.Helper() + _, err := wfRegC.ActivateWorkflow(th.ContractsOwner, workflowKey) + require.NoError(t, err, "failed to activate workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func pauseWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, +) { + t.Helper() + _, err := wfRegC.PauseWorkflow(th.ContractsOwner, workflowKey) + require.NoError(t, err, "failed to pause workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func deleteWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, +) { + t.Helper() + _, err := wfRegC.DeleteWorkflow(th.ContractsOwner, workflowKey) + require.NoError(t, err, "failed to delete workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func updateWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, newWorkflowID [32]byte, binaryURL string, configURL string, secretsURL string, +) { + t.Helper() + _, err := wfRegC.UpdateWorkflow(th.ContractsOwner, workflowKey, newWorkflowID, binaryURL, configURL, secretsURL) + require.NoError(t, err, "failed to update workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} diff --git a/core/services/workflows/syncer/contract_reader_mock.go b/core/services/workflows/syncer/contract_reader_mock.go deleted file mode 100644 index e6e7c8385f5..00000000000 --- a/core/services/workflows/syncer/contract_reader_mock.go +++ /dev/null @@ -1,302 +0,0 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. - -package syncer - -import ( - context "context" - - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - primitives "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - mock "github.com/stretchr/testify/mock" - - types "github.com/smartcontractkit/chainlink-common/pkg/types" -) - -// MockContractReader is an autogenerated mock type for the ContractReader type -type MockContractReader struct { - mock.Mock -} - -type MockContractReader_Expecter struct { - mock *mock.Mock -} - -func (_m *MockContractReader) EXPECT() *MockContractReader_Expecter { - return &MockContractReader_Expecter{mock: &_m.Mock} -} - -// Bind provides a mock function with given fields: _a0, _a1 -func (_m *MockContractReader) Bind(_a0 context.Context, _a1 []types.BoundContract) error { - ret := _m.Called(_a0, _a1) - - if len(ret) == 0 { - panic("no return value specified for Bind") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.BoundContract) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Bind_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Bind' -type MockContractReader_Bind_Call struct { - *mock.Call -} - -// Bind is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 []types.BoundContract -func (_e *MockContractReader_Expecter) Bind(_a0 interface{}, _a1 interface{}) *MockContractReader_Bind_Call { - return &MockContractReader_Bind_Call{Call: _e.mock.On("Bind", _a0, _a1)} -} - -func (_c *MockContractReader_Bind_Call) Run(run func(_a0 context.Context, _a1 []types.BoundContract)) *MockContractReader_Bind_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.BoundContract)) - }) - return _c -} - -func (_c *MockContractReader_Bind_Call) Return(_a0 error) *MockContractReader_Bind_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Bind_Call) RunAndReturn(run func(context.Context, []types.BoundContract) error) *MockContractReader_Bind_Call { - _c.Call.Return(run) - return _c -} - -// Close provides a mock function with given fields: -func (_m *MockContractReader) Close() error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Close") - } - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type MockContractReader_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -func (_e *MockContractReader_Expecter) Close() *MockContractReader_Close_Call { - return &MockContractReader_Close_Call{Call: _e.mock.On("Close")} -} - -func (_c *MockContractReader_Close_Call) Run(run func()) *MockContractReader_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockContractReader_Close_Call) Return(_a0 error) *MockContractReader_Close_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Close_Call) RunAndReturn(run func() error) *MockContractReader_Close_Call { - _c.Call.Return(run) - return _c -} - -// GetLatestValueWithHeadData provides a mock function with given fields: ctx, readName, confidenceLevel, params, returnVal -func (_m *MockContractReader) GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (*types.Head, error) { - ret := _m.Called(ctx, readName, confidenceLevel, params, returnVal) - - if len(ret) == 0 { - panic("no return value specified for GetLatestValueWithHeadData") - } - - var r0 *types.Head - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, any, any) (*types.Head, error)); ok { - return rf(ctx, readName, confidenceLevel, params, returnVal) - } - if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, any, any) *types.Head); ok { - r0 = rf(ctx, readName, confidenceLevel, params, returnVal) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Head) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, primitives.ConfidenceLevel, any, any) error); ok { - r1 = rf(ctx, readName, confidenceLevel, params, returnVal) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockContractReader_GetLatestValueWithHeadData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestValueWithHeadData' -type MockContractReader_GetLatestValueWithHeadData_Call struct { - *mock.Call -} - -// GetLatestValueWithHeadData is a helper method to define mock.On call -// - ctx context.Context -// - readName string -// - confidenceLevel primitives.ConfidenceLevel -// - params any -// - returnVal any -func (_e *MockContractReader_Expecter) GetLatestValueWithHeadData(ctx interface{}, readName interface{}, confidenceLevel interface{}, params interface{}, returnVal interface{}) *MockContractReader_GetLatestValueWithHeadData_Call { - return &MockContractReader_GetLatestValueWithHeadData_Call{Call: _e.mock.On("GetLatestValueWithHeadData", ctx, readName, confidenceLevel, params, returnVal)} -} - -func (_c *MockContractReader_GetLatestValueWithHeadData_Call) Run(run func(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any)) *MockContractReader_GetLatestValueWithHeadData_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(primitives.ConfidenceLevel), args[3].(any), args[4].(any)) - }) - return _c -} - -func (_c *MockContractReader_GetLatestValueWithHeadData_Call) Return(head *types.Head, err error) *MockContractReader_GetLatestValueWithHeadData_Call { - _c.Call.Return(head, err) - return _c -} - -func (_c *MockContractReader_GetLatestValueWithHeadData_Call) RunAndReturn(run func(context.Context, string, primitives.ConfidenceLevel, any, any) (*types.Head, error)) *MockContractReader_GetLatestValueWithHeadData_Call { - _c.Call.Return(run) - return _c -} - -// QueryKey provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 -func (_m *MockContractReader) QueryKey(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any) ([]types.Sequence, error) { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4) - - if len(ret) == 0 { - panic("no return value specified for QueryKey") - } - - var r0 []types.Sequence - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)); ok { - return rf(_a0, _a1, _a2, _a3, _a4) - } - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) []types.Sequence); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.Sequence) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) error); ok { - r1 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockContractReader_QueryKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryKey' -type MockContractReader_QueryKey_Call struct { - *mock.Call -} - -// QueryKey is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 types.BoundContract -// - _a2 query.KeyFilter -// - _a3 query.LimitAndSort -// - _a4 any -func (_e *MockContractReader_Expecter) QueryKey(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockContractReader_QueryKey_Call { - return &MockContractReader_QueryKey_Call{Call: _e.mock.On("QueryKey", _a0, _a1, _a2, _a3, _a4)} -} - -func (_c *MockContractReader_QueryKey_Call) Run(run func(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any)) *MockContractReader_QueryKey_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.BoundContract), args[2].(query.KeyFilter), args[3].(query.LimitAndSort), args[4].(any)) - }) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) Return(_a0 []types.Sequence, _a1 error) *MockContractReader_QueryKey_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) RunAndReturn(run func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)) *MockContractReader_QueryKey_Call { - _c.Call.Return(run) - return _c -} - -// Start provides a mock function with given fields: ctx -func (_m *MockContractReader) Start(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for Start") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' -type MockContractReader_Start_Call struct { - *mock.Call -} - -// Start is a helper method to define mock.On call -// - ctx context.Context -func (_e *MockContractReader_Expecter) Start(ctx interface{}) *MockContractReader_Start_Call { - return &MockContractReader_Start_Call{Call: _e.mock.On("Start", ctx)} -} - -func (_c *MockContractReader_Start_Call) Run(run func(ctx context.Context)) *MockContractReader_Start_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *MockContractReader_Start_Call) Return(_a0 error) *MockContractReader_Start_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Start_Call) RunAndReturn(run func(context.Context) error) *MockContractReader_Start_Call { - _c.Call.Return(run) - return _c -} - -// NewMockContractReader creates a new instance of MockContractReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockContractReader(t interface { - mock.TestingT - Cleanup(func()) -}) *MockContractReader { - mock := &MockContractReader{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/workflows/syncer/heap.go b/core/services/workflows/syncer/heap.go deleted file mode 100644 index 061293928a3..00000000000 --- a/core/services/workflows/syncer/heap.go +++ /dev/null @@ -1,63 +0,0 @@ -package syncer - -import "container/heap" - -type Heap interface { - // Push adds a new item to the heap. - Push(x WorkflowRegistryEventResponse) - - // Pop removes the smallest item from the heap and returns it. - Pop() WorkflowRegistryEventResponse - - // Len returns the number of items in the heap. - Len() int -} - -// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods. -type publicHeap[T any] struct { - heap heap.Interface -} - -func (h *publicHeap[T]) Push(x T) { - heap.Push(h.heap, x) -} - -func (h *publicHeap[T]) Pop() T { - return heap.Pop(h.heap).(T) -} - -func (h *publicHeap[T]) Len() int { - return h.heap.Len() -} - -// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height. -type blockHeightHeap []WorkflowRegistryEventResponse - -// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height. -func newBlockHeightHeap() Heap { - h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0)) - heap.Init(&h) - return &publicHeap[WorkflowRegistryEventResponse]{heap: &h} -} - -func (h *blockHeightHeap) Len() int { return len(*h) } - -func (h *blockHeightHeap) Less(i, j int) bool { - return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height -} - -func (h *blockHeightHeap) Swap(i, j int) { - (*h)[i], (*h)[j] = (*h)[j], (*h)[i] -} - -func (h *blockHeightHeap) Push(x any) { - *h = append(*h, x.(WorkflowRegistryEventResponse)) -} - -func (h *blockHeightHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 75fcc9735ad..223fbe8e758 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -5,13 +5,14 @@ import ( "encoding/hex" "encoding/json" "fmt" + "iter" "sync" "time" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" @@ -90,19 +91,19 @@ type WorkflowLoadConfig struct { // FetcherFunc is an abstraction for fetching the contents stored at a URL. type FetcherFunc func(ctx context.Context, url string) ([]byte, error) -type ContractReaderFactory interface { - NewContractReader(context.Context, []byte) (types.ContractReader, error) -} - // ContractReader is a subset of types.ContractReader defined locally to enable mocking. type ContractReader interface { Start(ctx context.Context) error Close() error Bind(context.Context, []types.BoundContract) error - QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) + QueryKeys(ctx context.Context, keyQueries []types.ContractKeyFilter, limitAndSort query.LimitAndSort) (iter.Seq2[string, types.Sequence], error) GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (head *types.Head, err error) } +type ContractReaderFactory interface { + NewContractReader(context.Context, []byte) (types.ContractReader, error) +} + // WorkflowRegistrySyncer is the public interface of the package. type WorkflowRegistrySyncer interface { services.Service @@ -128,21 +129,11 @@ type workflowRegistry struct { newContractReaderFn newContractReaderFn - eventPollerCfg WorkflowEventPollerConfig - eventTypes []WorkflowRegistryEventType - - // eventsCh is read by the handler and each event is handled once received. - eventsCh chan WorkflowRegistryEventResponse + eventPollerCfg WorkflowEventPollerConfig + eventTypes []WorkflowRegistryEventType handler evtHandler initialWorkflowsStateLoader initialWorkflowsStateLoader - // batchCh is a channel that receives batches of events from the contract query goroutines. - batchCh chan []WorkflowRegistryEventResponse - - // heap is a min heap that merges batches of events from the contract query goroutines. The - // default min heap is sorted by block height. - heap Heap - workflowDonNotifier donNotifier reader ContractReader @@ -197,11 +188,8 @@ func NewWorkflowRegistry( newContractReaderFn: newContractReaderFn, workflowRegistryAddress: addr, eventPollerCfg: eventPollerConfig, - heap: newBlockHeightHeap(), stopCh: make(services.StopChan), eventTypes: ets, - eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), handler: handler, initialWorkflowsStateLoader: initialWorkflowsStateLoader, workflowDonNotifier: workflowDonNotifier, @@ -238,15 +226,13 @@ func (w *workflowRegistry) Start(_ context.Context) error { return } - w.syncEventsLoop(ctx, loadWorkflowsHead.Height) - }() - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer cancel() + reader, err := w.getContractReader(ctx) + if err != nil { + w.lggr.Criticalf("contract reader unavailable : %s", err) + return + } - w.handlerLoop(ctx) + w.readRegistryEvents(ctx, reader, loadWorkflowsHead.Height) }() return nil @@ -273,135 +259,82 @@ func (w *workflowRegistry) Name() string { return name } -// handlerLoop handles the events that are emitted by the contract. -func (w *workflowRegistry) handlerLoop(ctx context.Context) { +// readRegistryEvents polls the contract for events and send them to the events channel. +func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader ContractReader, lastReadBlockNumber string) { + ticker := w.getTicker() + + var keyQueries = make([]types.ContractKeyFilter, 0, len(w.eventTypes)) + for _, et := range w.eventTypes { + var logData values.Value + keyQueries = append(keyQueries, types.ContractKeyFilter{ + KeyFilter: query.KeyFilter{ + Key: string(et), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(lastReadBlockNumber, primitives.Gt), + }, + }, + Contract: types.BoundContract{ + Name: WorkflowRegistryContractName, + Address: w.workflowRegistryAddress, + }, + SequenceDataType: &logData, + }) + } + + cursor := "" for { select { case <-ctx.Done(): return - case resp, open := <-w.eventsCh: - if !open { - return + case <-ticker: + limitAndSort := query.LimitAndSort{ + SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, + Limit: query.Limit{Count: w.eventPollerCfg.QueryCount}, } - - if resp.Err != nil || resp.Event == nil { - w.lggr.Errorw("failed to handle event", "err", resp.Err) - continue + if cursor != "" { + limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, w.eventPollerCfg.QueryCount) } - event := resp.Event - w.lggr.Debugf("handling event: %+v", event) - if err := w.handler.Handle(ctx, *event); err != nil { - w.lggr.Errorw("failed to handle event", "event", event, "err", err) + logsIter, err := reader.QueryKeys(ctx, keyQueries, limitAndSort) + if err != nil { + w.lggr.Errorw("failed to query keys", "err", err) continue } - } - } -} -// syncEventsLoop polls the contract for events and passes them to a channel for handling. -func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumber string) { - var ( - // sendLog is a helper that sends a WorkflowRegistryEventResponse to the eventsCh in a - // blocking way that will send the response or be canceled. - sendLog = func(resp WorkflowRegistryEventResponse) { - select { - case w.eventsCh <- resp: - case <-ctx.Done(): + var logs []sequenceWithEventType + for eventType, log := range logsIter { + logs = append(logs, sequenceWithEventType{ + Sequence: log, + EventType: WorkflowRegistryEventType(eventType), + }) } - } - - ticker = w.getTicker() - - signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) - ) - - // critical failure if there is no reader, the loop will exit and the parent context will be - // canceled. - reader, err := w.getContractReader(ctx) - if err != nil { - w.lggr.Criticalf("contract reader unavailable : %s", err) - return - } - - // fan out and query for each event type - for i := 0; i < len(w.eventTypes); i++ { - signal := make(chan struct{}, 1) - signals[w.eventTypes[i]] = signal - w.wg.Add(1) - go func() { - defer w.wg.Done() - - queryEvent( - ctx, - signal, - w.lggr, - reader, - lastReadBlockNumber, - queryEventConfig{ - ContractName: WorkflowRegistryContractName, - ContractAddress: w.workflowRegistryAddress, - WorkflowEventPollerConfig: w.eventPollerCfg, - }, - w.eventTypes[i], - w.batchCh, - ) - }() - } + w.lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", w.eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", cursor) - // Periodically send a signal to all the queryEvent goroutines to query the contract - for { - select { - case <-ctx.Done(): - return - case <-ticker: - w.lggr.Debugw("Syncing with WorkflowRegistry") - // for each event type, send a signal for it to execute a query and produce a new - // batch of event logs - for i := 0; i < len(w.eventTypes); i++ { - signal := signals[w.eventTypes[i]] - select { - case signal <- struct{}{}: - case <-ctx.Done(): - return - } + // ChainReader QueryKey API provides logs including the cursor value and not + // after the cursor value. If the response only consists of the log corresponding + // to the cursor and no log after it, then we understand that there are no new + // logs + if len(logs) == 1 && logs[0].Sequence.Cursor == cursor { + w.lggr.Infow("No new logs since", "cursor", cursor) + continue } - // block on fan-in until all fetched event logs are sent to the handlers - w.orderAndSend( - ctx, - len(w.eventTypes), - w.batchCh, - sendLog, - ) - } - } -} + var events []WorkflowRegistryEventResponse + for _, log := range logs { + if log.Sequence.Cursor == cursor { + continue + } -// orderAndSend reads n batches from the batch channel, heapifies all the batches then dequeues -// the min heap via the sendLog function. -func (w *workflowRegistry) orderAndSend( - ctx context.Context, - batchCount int, - batchCh <-chan []WorkflowRegistryEventResponse, - sendLog func(WorkflowRegistryEventResponse), -) { - for { - select { - case <-ctx.Done(): - return - case batch := <-batchCh: - for _, response := range batch { - w.heap.Push(response) + events = append(events, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, w.lggr)) + cursor = log.Sequence.Cursor } - batchCount-- - // If we have received responses for all the events, then we can drain the heap. - if batchCount == 0 { - for w.heap.Len() > 0 { - sendLog(w.heap.Pop()) + for _, event := range events { + err := w.handler.Handle(ctx, event.Event) + if err != nil { + w.lggr.Errorw("failed to handle event", "err", err) } - return } } } @@ -437,95 +370,9 @@ func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReade return w.reader, nil } -type queryEventConfig struct { - ContractName string - ContractAddress string - WorkflowEventPollerConfig -} - -// queryEvent queries the contract for events of the given type on each tick from the ticker. -// Sends a batch of event logs to the batch channel. The batch represents all the -// event logs read since the last query. Loops until the context is canceled. -func queryEvent( - ctx context.Context, - ticker <-chan struct{}, - lggr logger.Logger, - reader ContractReader, - lastReadBlockNumber string, - cfg queryEventConfig, - et WorkflowRegistryEventType, - batchCh chan<- []WorkflowRegistryEventResponse, -) { - // create query - var ( - logData values.Value - cursor = "" - limitAndSort = query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: cfg.QueryCount}, - } - bc = types.BoundContract{ - Name: cfg.ContractName, - Address: cfg.ContractAddress, - } - ) - - // Loop until canceled - for { - select { - case <-ctx.Done(): - return - case <-ticker: - responseBatch := []WorkflowRegistryEventResponse{} - - if cursor != "" { - limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) - } - - logs, err := reader.QueryKey( - ctx, - bc, - query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gte), - }, - }, - limitAndSort, - &logData, - ) - lcursor := cursor - if lcursor == "" { - lcursor = "empty" - } - lggr.Debugw("QueryKeys called", "logs", len(logs), "eventType", et, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) - - if err != nil { - lggr.Errorw("QueryKey failure", "err", err) - continue - } - - // ChainReader QueryKey API provides logs including the cursor value and not - // after the cursor value. If the response only consists of the log corresponding - // to the cursor and no log after it, then we understand that there are no new - // logs - if len(logs) == 1 && logs[0].Cursor == cursor { - lggr.Infow("No new logs since", "cursor", cursor) - continue - } - - for _, log := range logs { - if log.Cursor == cursor { - continue - } - - responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr)) - cursor = log.Cursor - } - batchCh <- responseBatch - } - } +type sequenceWithEventType struct { + Sequence types.Sequence + EventType WorkflowRegistryEventType } func getWorkflowRegistryEventReader( @@ -681,7 +528,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don var workflows GetWorkflowMetadataListByDONReturnVal headAtLastRead, err = contractReader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) if err != nil { - return nil, fmt.Errorf("failed to get workflow metadata for don %w", err) + return nil, fmt.Errorf("failed to get lastest value with head data %w", err) } l.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList)) diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go deleted file mode 100644 index 621d3d123d5..00000000000 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ /dev/null @@ -1,234 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "testing" - "time" - - "github.com/stretchr/testify/mock" - - "github.com/jonboulle/clockwork" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/matches" - - "github.com/stretchr/testify/require" -) - -type testDonNotifier struct { - don capabilities.DON - err error -} - -func (t *testDonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { - return t.don, t.err -} - -func Test_Workflow_Registry_Syncer(t *testing.T) { - var ( - giveContents = "contents" - wantContents = "updated contents" - contractAddress = "0xdeadbeef" - giveCfg = WorkflowEventPollerConfig{ - QueryCount: 20, - } - giveURL = "http://example.com" - giveHash, err = crypto.Keccak256([]byte(giveURL)) - - giveLog = types.Sequence{ - Data: map[string]any{ - "SecretsURLHash": giveHash, - "Owner": "0xowneraddr", - }, - Cursor: "cursor", - } - ) - - require.NoError(t, err) - - var ( - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = &orm{ds: db, lggr: lggr} - ctx, cancel = context.WithCancel(testutils.Context(t)) - reader = NewMockContractReader(t) - emitter = custmsg.NewLabeler() - gateway = func(_ context.Context, _ string) ([]byte, error) { - return []byte(wantContents), nil - } - ticker = make(chan time.Time) - - handler = NewEventHandler(lggr, orm, gateway, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - loader = NewWorkflowRegistryContractLoader(lggr, contractAddress, func(ctx context.Context, bytes []byte) (ContractReader, error) { - return reader, nil - }, handler) - - worker = NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (ContractReader, error) { - return reader, nil - }, contractAddress, - WorkflowEventPollerConfig{ - QueryCount: 20, - }, handler, loader, - &testDonNotifier{ - don: capabilities.DON{ - ID: 1, - }, - err: nil, - }, - WithTicker(ticker)) - ) - - // Cleanup the worker - defer cancel() - - // Seed the DB with an original entry - _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) - require.NoError(t, err) - - // Mock out the contract reader query - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(ForceUpdateSecretsEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{giveLog}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowPausedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowDeletedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowActivatedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowUpdatedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowRegisteredEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().GetLatestValueWithHeadData(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&types.Head{ - Height: "0", - }, nil) - reader.EXPECT().Start(mock.Anything).Return(nil) - reader.EXPECT().Bind(mock.Anything, mock.Anything).Return(nil) - - // Go run the worker - servicetest.Run(t, worker) - - // Send a tick to start a query - ticker <- time.Now() - - // Require the secrets contents to eventually be updated - require.Eventually(t, func() bool { - secrets, err := orm.GetContents(ctx, giveURL) - require.NoError(t, err) - return secrets == wantContents - }, 5*time.Second, time.Second) -} From eaeb2ebe7bfc53572655be322b793f0bf9556e1e Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Thu, 12 Dec 2024 10:26:36 -0700 Subject: [PATCH 3/5] [KS-616] asset job updates (#15573) * fix error handling * add test * idempotent registry * add changeset * fix tests * isolate fix to mercury; more tests --- .changeset/big-camels-report.md | 5 + core/services/ocr2/plugins/mercury/plugin.go | 89 ++++++++---- .../ocr2/plugins/mercury/plugin_test.go | 133 ++++++++++++++++-- plugins/registrar.go | 2 +- 4 files changed, 190 insertions(+), 39 deletions(-) create mode 100644 .changeset/big-camels-report.md diff --git a/.changeset/big-camels-report.md b/.changeset/big-camels-report.md new file mode 100644 index 00000000000..f81f66b9138 --- /dev/null +++ b/.changeset/big-camels-report.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix fix non-idempotent loopp registry.Register diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index 8a4101804dd..b0983e55c89 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -1,6 +1,7 @@ package mercury import ( + "context" "encoding/json" "fmt" "os/exec" @@ -79,14 +80,13 @@ func NewServices( return nil, errors.New("expected job to have a non-nil PipelineSpec") } - var err error var pluginConfig config.PluginConfig if len(jb.OCR2OracleSpec.PluginConfig) == 0 { if !enableTriggerCapability { return nil, fmt.Errorf("at least one transmission option must be configured") } } else { - err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig) + err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig) if err != nil { return nil, errors.WithStack(err) } @@ -101,8 +101,8 @@ func NewServices( // encapsulate all the subservices and ensure we close them all if any fail to start srvs := []job.ServiceCtx{ocr2Provider} abort := func() { - if err = services.MultiCloser(srvs).Close(); err != nil { - lggr.Errorw("Error closing unused services", "err", err) + if cerr := services.MultiCloser(srvs).Close(); cerr != nil { + lggr.Errorw("Error closing unused services", "err", cerr) } } saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) @@ -112,6 +112,7 @@ func NewServices( var ( factory ocr3types.MercuryPluginFactory factoryServices []job.ServiceCtx + fErr error ) fCfg := factoryCfg{ orm: orm, @@ -127,31 +128,31 @@ func NewServices( } switch feedID.Version() { case 1: - factory, factoryServices, err = newv1factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv1factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 2: - factory, factoryServices, err = newv2factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv2factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 3: - factory, factoryServices, err = newv3factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv3factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 4: - factory, factoryServices, err = newv4factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv4factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr) } srvs = append(srvs, factoryServices...) default: @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loop mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. return factory, srvs, nil } -func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) { +func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) { lggr.Debugw("Initializing Mercury loop", "command", cmd) mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String()) envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) if err != nil { - return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) + return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) } + loopID := mercuryLggr.Name() cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{ - ID: mercuryLggr.Name(), + ID: loopID, Cmd: cmd, Env: envVars, }) if err != nil { - return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + } + return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil +} + +// loopUnregisterCloser is a helper to unregister a loop +// as a service +// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern +// perhaps it can be implemented in the delegate on job delete. +type loopUnregisterCloser struct { + r plugins.RegistrarConfig + id string +} + +func (l *loopUnregisterCloser) Close() error { + l.r.UnregisterLOOP(l.id) + return nil +} + +func (l *loopUnregisterCloser) Start(ctx context.Context) error { + return nil +} + +func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser { + return &loopUnregisterCloser{ + r: r, + id: id, } - return cmdFn, opts, mercuryLggr, nil } diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index 22aaf7522de..eb67da53100 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -2,6 +2,7 @@ package mercury_test import ( "context" + "errors" "os/exec" "reflect" "testing" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -22,6 +24,7 @@ import ( v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" v4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury" @@ -92,21 +95,23 @@ var ( // this is kind of gross, but it's the best way to test return values of the services expectedEmbeddedServiceCnt = 3 - expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 1 + expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 2 // factory server and loop unregisterer ) func TestNewServices(t *testing.T) { type args struct { pluginConfig job.JSONConfig feedID utils.FeedID + cfg mercuryocr2.Config } - tests := []struct { + testCases := []struct { name string args args loopMode bool wantLoopFactory any wantServiceCnt int wantErr bool + wantErrStr string }{ { name: "no plugin config error ", @@ -186,6 +191,19 @@ func TestNewServices(t *testing.T) { wantErr: false, wantLoopFactory: &loop.MercuryV3Service{}, }, + { + name: "v3 loop err", + loopMode: true, + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + cfg: mercuryocr2.NewMercuryConfig(1, 1, &testRegistrarConfig{failRegister: true}), + }, + wantServiceCnt: expectedLoopServiceCnt, + wantErr: true, + wantLoopFactory: &loop.MercuryV3Service{}, + wantErrStr: "failed to init loop for feed", + }, { name: "v4 loop", loopMode: true, @@ -198,17 +216,27 @@ func TestNewServices(t *testing.T) { wantLoopFactory: &loop.MercuryV4Service{}, }, } - for _, tt := range tests { + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { if tt.loopMode { t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) } - got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID) + // use default config if not provided + if tt.args.cfg == nil { + tt.args.cfg = testCfg + } + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) if (err != nil) != tt.wantErr { t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) return } + if err != nil { + if tt.wantErrStr != "" { + assert.Contains(t, err.Error(), tt.wantErrStr) + } + return + } assert.Len(t, got, tt.wantServiceCnt) if tt.loopMode { foundLoopFactory := false @@ -222,15 +250,97 @@ func TestNewServices(t *testing.T) { } }) } + + t.Run("restartable loop", func(t *testing.T) { + // setup a real loop registry to test restartability + registry := plugins.NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "") + loopRegistrarConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, registry.Register, registry.Unregister) + prodCfg := mercuryocr2.NewMercuryConfig(1, 1, loopRegistrarConfig) + type args struct { + pluginConfig job.JSONConfig + feedID utils.FeedID + cfg mercuryocr2.Config + } + testCases := []struct { + name string + args args + wantErr bool + }{ + { + name: "v1 loop", + args: args{ + pluginConfig: v1jsonCfg, + feedID: v1FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v2 loop", + args: args{ + pluginConfig: v2jsonCfg, + feedID: v2FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v3 loop", + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v4 loop", + args: args{ + pluginConfig: v4jsonCfg, + feedID: v4FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") + assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) + + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) + return + } + // hack to simulate a restart. we don't have enough boilerplate to start the oracle service + // only care about the subservices so we start all except the oracle, which happens to be the last one + for i := 0; i < len(got)-1; i++ { + require.NoError(t, got[i].Start(tests.Context(t))) + } + // if we don't close the services, we get conflicts with the loop registry + _, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + require.ErrorContains(t, err, "plugin already registered") + + // close all services and try again + for i := len(got) - 2; i >= 0; i-- { + require.NoError(t, got[i].Close()) + } + _, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + require.NoError(t, err) + }) + } + }) } // we are only varying the version via feedID (and the plugin config) // this wrapper supplies dummy values for the rest of the arguments -func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID) ([]job.ServiceCtx, error) { +func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID, cfg mercuryocr2.Config) ([]job.ServiceCtx, error) { t.Helper() jb := testJob jb.OCR2OracleSpec.PluginConfig = pluginConfig - return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID, false) + return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, cfg, nil, &testDataSourceORM{}, feedID, false) } type testProvider struct{} @@ -292,16 +402,21 @@ func (*testProvider) ReportCodecV3() v3.ReportCodec { return nil } func (*testProvider) ReportCodecV4() v4.ReportCodec { return nil } // Start implements types.MercuryProvider. -func (*testProvider) Start(context.Context) error { panic("unimplemented") } +func (*testProvider) Start(context.Context) error { return nil } var _ commontypes.MercuryProvider = (*testProvider)(nil) -type testRegistrarConfig struct{} +type testRegistrarConfig struct { + failRegister bool +} func (c *testRegistrarConfig) UnregisterLOOP(ID string) {} // RegisterLOOP implements plugins.RegistrarConfig. -func (*testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { +func (c *testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { + if c.failRegister { + return nil, loop.GRPCOpts{}, errors.New("failed to register") + } return nil, loop.GRPCOpts{}, nil } diff --git a/plugins/registrar.go b/plugins/registrar.go index 2a82f2a6204..8523d3980cc 100644 --- a/plugins/registrar.go +++ b/plugins/registrar.go @@ -6,7 +6,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" ) -// RegistrarConfig generates contains static configuration inher +// RegistrarConfig generates contains static configuration type RegistrarConfig interface { RegisterLOOP(config CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) UnregisterLOOP(ID string) From acc38461e5e41790f54bd85a1d1f60c6e4a01ee3 Mon Sep 17 00:00:00 2001 From: Cedric Date: Thu, 12 Dec 2024 18:08:37 +0000 Subject: [PATCH 4/5] [CAPPL-382/CAPPL-366] Miscellaneous fixes (#15666) --- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 ++-- .../capabilities/workflows/syncer/workflow_syncer_test.go | 4 ++-- core/services/workflows/syncer/handler.go | 2 +- core/services/workflows/syncer/handler_test.go | 8 ++++---- deployment/go.mod | 2 +- deployment/go.sum | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 ++-- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 ++-- 13 files changed, 22 insertions(+), 22 deletions(-) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 6bab1f30f8e..d29df20e3fa 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -33,7 +33,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v0.8.1 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index f86aad22fb4..c14563ea569 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1142,8 +1142,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 h1:NjrU7KOn3Tk+C6QFo9tQBqeotPKytpBwhn/J1s+yiiY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes= diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 066e85e839f..c7c164803cb 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -418,7 +418,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { require.NoError(t, err) from := [20]byte(backendTH.ContractsOwner.From) - id, err := workflows.GenerateWorkflowID(from[:], []byte(wantContents), []byte(""), "") + id, err := workflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "") require.NoError(t, err) giveWorkflow.ID = id @@ -516,7 +516,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { require.NoError(t, err) from := [20]byte(backendTH.ContractsOwner.From) - id, err := workflows.GenerateWorkflowID(from[:], []byte(wantContents), []byte(""), "") + id, err := workflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "") require.NoError(t, err) giveWorkflow.ID = id diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index f3392a8489a..4ef7f952249 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -428,7 +428,7 @@ func (h *eventHandler) workflowRegisteredEvent( } // Calculate the hash of the binary and config files - hash, err := pkgworkflows.GenerateWorkflowID(payload.WorkflowOwner, decodedBinary, config, payload.SecretsURL) + hash, err := pkgworkflows.GenerateWorkflowID(payload.WorkflowOwner, payload.WorkflowName, decodedBinary, config, payload.SecretsURL) if err != nil { return fmt.Errorf("failed to generate workflow id: %w", err) } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index eb8b338158f..f205cbde1cd 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -444,7 +444,7 @@ func testRunningWorkflow(t *testing.T, tc testCase) { fetcher = tc.fetcher ) - giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL) + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, secretsURL) require.NoError(t, err) wfID := hex.EncodeToString(giveWFID[:]) @@ -492,7 +492,7 @@ func Test_workflowDeletedHandler(t *testing.T) { }) ) - giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL) + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, secretsURL) require.NoError(t, err) wfIDs := hex.EncodeToString(giveWFID[:]) @@ -584,9 +584,9 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { }) ) - giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL) + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, secretsURL) require.NoError(t, err) - updatedWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, updateConfig, secretsURL) + updatedWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, updateConfig, secretsURL) require.NoError(t, err) require.NoError(t, err) diff --git a/deployment/go.mod b/deployment/go.mod index 8c30d54bdff..fc3d70c3900 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -29,7 +29,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240926212305-a6deabdfce86 github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 diff --git a/deployment/go.sum b/deployment/go.sum index b1ce805ba28..f9fb767d5da 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1411,8 +1411,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 h1:NjrU7KOn3Tk+C6QFo9tQBqeotPKytpBwhn/J1s+yiiY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes= diff --git a/go.mod b/go.mod index 2149898f15b..0f8a161768f 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db github.com/smartcontractkit/chainlink-feeds v0.1.1 diff --git a/go.sum b/go.sum index 45a2dfab4fe..76127a91b4b 100644 --- a/go.sum +++ b/go.sum @@ -1125,8 +1125,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 h1:NjrU7KOn3Tk+C6QFo9tQBqeotPKytpBwhn/J1s+yiiY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index c1b012e3641..b87192af47e 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -47,7 +47,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.2 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.19 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index fb3d895d130..75f4e862f61 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1432,8 +1432,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 h1:NjrU7KOn3Tk+C6QFo9tQBqeotPKytpBwhn/J1s+yiiY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 5f49519cb4b..3d240cccc9e 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -27,7 +27,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.33.0 github.com/slack-go/slack v0.15.0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.19 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.2 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index cda5cebf370..96861fdc048 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1423,8 +1423,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 h1:NjrU7KOn3Tk+C6QFo9tQBqeotPKytpBwhn/J1s+yiiY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes= From 52f364a6cd842fe63c4cab6182f4c9fbbf7d134e Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 12 Dec 2024 13:30:33 -0500 Subject: [PATCH 5/5] Classify Arbitrum rpc server errors (#15488) * Add service errors * Add default service errors * Fix tests * Update giant-eels-jump.md * Update errors.go --- .changeset/giant-eels-jump.md | 5 +++++ core/chains/evm/client/errors.go | 15 +++++++++++++-- core/chains/evm/client/errors_test.go | 15 +++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 .changeset/giant-eels-jump.md diff --git a/.changeset/giant-eels-jump.md b/.changeset/giant-eels-jump.md new file mode 100644 index 00000000000..5ab8ca875ca --- /dev/null +++ b/.changeset/giant-eels-jump.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add error handling for Arbitrum RPC server timeouts. #added diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index 1075dc40606..bde97185580 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -64,6 +64,7 @@ const ( ServiceUnavailable TerminallyStuck TooManyResults + ServiceTimeout ) type ClientErrors map[int]*regexp.Regexp @@ -160,7 +161,8 @@ var arbitrum = ClientErrors{ Fatal: arbitrumFatal, L2FeeTooLow: regexp.MustCompile(`(: |^)max fee per gas less than block base fee(:|$)`), L2Full: regexp.MustCompile(`(: |^)(queue full|sequencer pending tx pool full, please try again)(:|$)`), - ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout`), + ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout|(: |^)503 Service Temporarily Unavailable(:|$)`), + ServiceTimeout: regexp.MustCompile(`(: |^)408 Request Timeout(:|$)`), } // Treasure @@ -398,6 +400,11 @@ func (s *SendError) IsServiceUnavailable(configErrors *ClientErrors) bool { return s.is(ServiceUnavailable, configErrors) || pkgerrors.Is(s.err, commonclient.ErroringNodeError) } +// IsServiceTimeout indicates if the error was caused by a service timeout +func (s *SendError) IsServiceTimeout(configErrors *ClientErrors) bool { + return s.is(ServiceTimeout, configErrors) +} + // IsTerminallyStuck indicates if a transaction was stuck without any chance of inclusion func (s *SendError) IsTerminallyStuckConfigError(configErrors *ClientErrors) bool { return s.is(TerminallyStuck, configErrors) @@ -619,6 +626,10 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger. lggr.Errorw(fmt.Sprintf("service unavailable while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx) return commonclient.Retryable } + if sendError.IsServiceTimeout(configErrors) { + lggr.Errorw(fmt.Sprintf("service timed out while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx) + return commonclient.Retryable + } if sendError.IsTimeout() { lggr.Errorw(fmt.Sprintf("timeout while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx) return commonclient.Retryable @@ -666,7 +677,7 @@ var drpc = ClientErrors{ // Linkpool, Blockdaemon, and Chainstack all return "request timed out" if the log results are too large for them to process var defaultClient = ClientErrors{ - TooManyResults: regexp.MustCompile(`request timed out`), + TooManyResults: regexp.MustCompile(`request timed out|408 Request Timed Out`), } // JSON-RPC error codes which can indicate a refusal of the server to process an eth_getLogs request because the result set is too large diff --git a/core/chains/evm/client/errors_test.go b/core/chains/evm/client/errors_test.go index 75ac21597d8..1f9aaa53365 100644 --- a/core/chains/evm/client/errors_test.go +++ b/core/chains/evm/client/errors_test.go @@ -245,6 +245,7 @@ func Test_Eth_Errors(t *testing.T) { {"network is unreachable", true, "Arbitrum"}, {"client error service unavailable", true, "tomlConfig"}, {"[Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Error invoking RPC: [Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Transaction execution returns a null value for transaction", true, "hedera"}, + {"call failed: 503 Service Temporarily Unavailable: \r\n503 Service Temporarily Unavailable\r\n\r\n

503 Service Temporarily Unavailable

\r\n\r\n\r\n", true, "Arbitrum"}, } for _, test := range tests { err = evmclient.NewSendErrorS(test.message) @@ -260,6 +261,20 @@ func Test_Eth_Errors(t *testing.T) { } }) + t.Run("IsServiceTimeout", func(t *testing.T) { + tests := []errorCase{ + {"call failed: 408 Request Timeout: {", true, "Arbitrum"}, + {"408 Request Timeout: {\"id\":303,\"jsonrpc\":\"2.0\",\"error\":{\"code\\\":-32009,\\\"message\\\":\\\"request timeout\\\"}}\",\"errVerbose\":\"408 Request Timeout:\n", true, "Arbitrum"}, + {"request timeout", false, "tomlConfig"}, + } + for _, test := range tests { + err = evmclient.NewSendErrorS(test.message) + assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect) + err = newSendErrorWrapped(test.message) + assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect) + } + }) + t.Run("IsTxFeeExceedsCap", func(t *testing.T) { tests := []errorCase{ {"tx fee (1.10 ether) exceeds the configured cap (1.00 ether)", true, "geth"},