From 6cb989206821b34fc61e9cb33066ac863c17176b Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Mon, 26 Aug 2024 23:37:21 +0200 Subject: [PATCH 1/3] Fix RPCClient Deadlock on Unsubscribe and NewHead (#14236) * Fix RPCClient Deadlock on Unsubscribe and NewHead * changeset (cherry picked from commit 0294e1f3813c0643b61af828ec438307dcab3123) --- .changeset/curly-birds-guess.md | 5 ++++ core/chains/evm/client/rpc_client.go | 15 +++++++----- core/chains/evm/client/rpc_client_test.go | 28 +++++++++++++++++++++++ 3 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 .changeset/curly-birds-guess.md diff --git a/.changeset/curly-birds-guess.md b/.changeset/curly-birds-guess.md new file mode 100644 index 00000000000..c66bd541787 --- /dev/null +++ b/.changeset/curly-birds-guess.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fixed deadlock in RPCClient causing CL Node to stop performing RPC requests for the affected chain #bugfix diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 9ab5fd135b4..263bf75e439 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -132,6 +132,7 @@ type rpcClient struct { // stateMu since it can happen on state transitions as well as rpcClient Close. chStopInFlight chan struct{} + chainInfoLock sync.RWMutex // intercepted values seen by callers of the rpcClient excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee highestUserObservations commonclient.ChainInfo // most recent chain info observed during current lifecycle (reseted on DisconnectAll) @@ -318,7 +319,9 @@ func (r *rpcClient) DisconnectAll() { } r.cancelInflightRequests() r.unsubscribeAll() + r.chainInfoLock.Lock() r.latestChainInfo = commonclient.ChainInfo{} + r.chainInfoLock.Unlock() } // unsubscribeAll unsubscribes all subscriptions @@ -1203,8 +1206,8 @@ func (r *rpcClient) onNewHead(ctx context.Context, requestCh <-chan struct{}, he return } - r.stateMu.Lock() - defer r.stateMu.Unlock() + r.chainInfoLock.Lock() + defer r.chainInfoLock.Unlock() if !commonclient.CtxIsHeathCheckRequest(ctx) { r.highestUserObservations.BlockNumber = max(r.highestUserObservations.BlockNumber, head.Number) r.highestUserObservations.TotalDifficulty = commonclient.MaxTotalDifficulty(r.highestUserObservations.TotalDifficulty, head.TotalDifficulty) @@ -1222,8 +1225,8 @@ func (r *rpcClient) onNewFinalizedHead(ctx context.Context, requestCh <-chan str if head == nil { return } - r.stateMu.Lock() - defer r.stateMu.Unlock() + r.chainInfoLock.Lock() + defer r.chainInfoLock.Unlock() if !commonclient.CtxIsHeathCheckRequest(ctx) { r.highestUserObservations.FinalizedBlockNumber = max(r.highestUserObservations.FinalizedBlockNumber, head.Number) } @@ -1236,8 +1239,8 @@ func (r *rpcClient) onNewFinalizedHead(ctx context.Context, requestCh <-chan str } func (r *rpcClient) GetInterceptedChainInfo() (latest, highestUserObservations commonclient.ChainInfo) { - r.stateMu.RLock() - defer r.stateMu.RUnlock() + r.chainInfoLock.RLock() + defer r.chainInfoLock.RUnlock() return r.latestChainInfo, r.highestUserObservations } diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 682c4352457..6b3e76d47df 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "net/url" + "sync" "testing" "github.com/ethereum/go-ethereum" @@ -126,6 +127,33 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { assert.Equal(t, int64(0), highestUserObservations.FinalizedBlockNumber) assert.Equal(t, (*big.Int)(nil), highestUserObservations.TotalDifficulty) }) + t.Run("Concurrent Unsubscribe and onNewHead calls do not lead to a deadlock", func(t *testing.T) { + const numberOfAttempts = 1000 // need a large number to increase the odds of reproducing the issue + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + var wg sync.WaitGroup + for i := 0; i < numberOfAttempts; i++ { + ch := make(chan *evmtypes.Head) + sub, err := rpc.SubscribeNewHead(tests.Context(t), ch) + require.NoError(t, err) + wg.Add(2) + go func() { + server.MustWriteBinaryMessageSync(t, makeNewHeadWSMessage(&evmtypes.Head{Number: 256, TotalDifficulty: big.NewInt(1000)})) + wg.Done() + }() + go func() { + rpc.UnsubscribeAllExceptAliveLoop() + sub.Unsubscribe() + wg.Done() + }() + wg.Wait() + } + + }) t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() From cba442dce731e34f3d2247dc7e4304d302c68659 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 27 Aug 2024 14:05:19 +0200 Subject: [PATCH 2/3] trigger GitHub actions From e2c5a02d815bf37c02ad6609759466a74cd08f60 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 27 Aug 2024 14:58:34 +0200 Subject: [PATCH 3/3] fix merge issues --- core/chains/evm/client/rpc_client_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 6b3e76d47df..a2b203eb859 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -132,7 +132,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) var wg sync.WaitGroup @@ -152,7 +152,6 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { }() wg.Wait() } - }) t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack)