Skip to content

Commit

Permalink
Merge branch 'develop' into feature/BCFR-1086-finality-violation
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko authored Dec 13, 2024
2 parents ab8add5 + 6f42463 commit ab39c5b
Show file tree
Hide file tree
Showing 66 changed files with 2,776 additions and 1,290 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix non-idempotent loopp registry.Register
5 changes: 5 additions & 0 deletions .changeset/giant-eels-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add error handling for Arbitrum RPC server timeouts. #added
16 changes: 8 additions & 8 deletions .github/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_TwoMessagesOnTwoLanesIncludingBatching$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -982,7 +982,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_MultipleMessagesOnOneLaneNoWaitForExec$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -998,7 +998,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_NotEnoughObservers$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -1014,7 +1014,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_DifferentSigners$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -1030,7 +1030,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_NotEnoughSigners$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -1046,7 +1046,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_DifferentRmnNodesForDifferentChains$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -1062,7 +1062,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_TwoMessagesOneSourceChainCursed$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand All @@ -1078,7 +1078,7 @@ runner-test-matrix:
- id: smoke/ccip/ccip_rmn_test.go:^TestRMN_GlobalCurseTwoMessagesOnTwoLanes$
path: integration-tests/smoke/ccip/ccip_rmn_test.go
test_env_type: docker
runs_on: ubuntu-latest
runs_on: ubuntu20.04-8cores-32GB
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/ci-core-partial.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
permissions:
id-token: write
contents: write
actions: write
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -86,7 +87,7 @@ jobs:
go-mod-download-directory: ${{ matrix.type.test-suite == 'ccip-deployment' && matrix.type.module-directory || '' }}

- name: Build Tests
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
timeout-minutes: 10
with:
pipeline-step: "build"
Expand All @@ -98,7 +99,7 @@ jobs:
build-flags: ${{ matrix.type.build-flags }}

- name: Run Tests
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
timeout-minutes: 15
env:
CL_DATABASE_URL: ${{ env.DB_URL }}
Expand All @@ -112,7 +113,7 @@ jobs:
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Update Test Index
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
with:
pipeline-step: "update"
collect-coverage: ${{ needs.filter.outputs.should-collect-coverage }}
Expand All @@ -130,7 +131,7 @@ jobs:
if: ${{ needs.filter.outputs.should-collect-coverage == 'true' }}
runs-on: ubuntu-latest
steps:
- name: Checkout the repo
- name: Checkout the repo
uses: actions/[email protected]
with:
# fetches all history for all tags and branches to provide more metadata for sonar reports
Expand Down
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,6 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
Expand Down
15 changes: 13 additions & 2 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
ServiceUnavailable
TerminallyStuck
TooManyResults
ServiceTimeout
)

type ClientErrors map[int]*regexp.Regexp
Expand Down Expand Up @@ -160,7 +161,8 @@ var arbitrum = ClientErrors{
Fatal: arbitrumFatal,
L2FeeTooLow: regexp.MustCompile(`(: |^)max fee per gas less than block base fee(:|$)`),
L2Full: regexp.MustCompile(`(: |^)(queue full|sequencer pending tx pool full, please try again)(:|$)`),
ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout`),
ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout|(: |^)503 Service Temporarily Unavailable(:|$)`),
ServiceTimeout: regexp.MustCompile(`(: |^)408 Request Timeout(:|$)`),
}

// Treasure
Expand Down Expand Up @@ -398,6 +400,11 @@ func (s *SendError) IsServiceUnavailable(configErrors *ClientErrors) bool {
return s.is(ServiceUnavailable, configErrors) || pkgerrors.Is(s.err, commonclient.ErroringNodeError)
}

// IsServiceTimeout indicates if the error was caused by a service timeout
func (s *SendError) IsServiceTimeout(configErrors *ClientErrors) bool {
return s.is(ServiceTimeout, configErrors)
}

// IsTerminallyStuck indicates if a transaction was stuck without any chance of inclusion
func (s *SendError) IsTerminallyStuckConfigError(configErrors *ClientErrors) bool {
return s.is(TerminallyStuck, configErrors)
Expand Down Expand Up @@ -619,6 +626,10 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
lggr.Errorw(fmt.Sprintf("service unavailable while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsServiceTimeout(configErrors) {
lggr.Errorw(fmt.Sprintf("service timed out while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsTimeout() {
lggr.Errorw(fmt.Sprintf("timeout while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
Expand Down Expand Up @@ -666,7 +677,7 @@ var drpc = ClientErrors{

// Linkpool, Blockdaemon, and Chainstack all return "request timed out" if the log results are too large for them to process
var defaultClient = ClientErrors{
TooManyResults: regexp.MustCompile(`request timed out`),
TooManyResults: regexp.MustCompile(`request timed out|408 Request Timed Out`),
}

// JSON-RPC error codes which can indicate a refusal of the server to process an eth_getLogs request because the result set is too large
Expand Down
15 changes: 15 additions & 0 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func Test_Eth_Errors(t *testing.T) {
{"network is unreachable", true, "Arbitrum"},
{"client error service unavailable", true, "tomlConfig"},
{"[Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Error invoking RPC: [Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Transaction execution returns a null value for transaction", true, "hedera"},
{"call failed: 503 Service Temporarily Unavailable: <html>\r\n<head><title>503 Service Temporarily Unavailable</title></head>\r\n<body>\r\n<center><h1>503 Service Temporarily Unavailable</h1></center>\r\n</body>\r\n</html>\r\n", true, "Arbitrum"},
}
for _, test := range tests {
err = evmclient.NewSendErrorS(test.message)
Expand All @@ -260,6 +261,20 @@ func Test_Eth_Errors(t *testing.T) {
}
})

t.Run("IsServiceTimeout", func(t *testing.T) {
tests := []errorCase{
{"call failed: 408 Request Timeout: {", true, "Arbitrum"},
{"408 Request Timeout: {\"id\":303,\"jsonrpc\":\"2.0\",\"error\":{\"code\\\":-32009,\\\"message\\\":\\\"request timeout\\\"}}\",\"errVerbose\":\"408 Request Timeout:\n", true, "Arbitrum"},
{"request timeout", false, "tomlConfig"},
}
for _, test := range tests {
err = evmclient.NewSendErrorS(test.message)
assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect)
err = newSendErrorWrapped(test.message)
assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect)
}
})

t.Run("IsTxFeeExceedsCap", func(t *testing.T) {
tests := []errorCase{
{"tx fee (1.10 ether) exceeds the configured cap (1.00 ether)", true, "geth"},
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241211192225-0b03fa331a49
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241211192225-0b03fa331a49 h1:OSWD6jugnwJz3IuLsIPpQn+3l9Xlww28j3Qr+5QILBI=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241211192225-0b03fa331a49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes=
Expand Down
59 changes: 48 additions & 11 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ var (
},
[]string{"donID", "serverURL", "code"},
)
promTransmitConcurrentTransmitGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_transmit_gauge",
Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
promTransmitConcurrentDeleteGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_delete_gauge",
Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
)

type ReportPacker interface {
Expand All @@ -87,12 +103,14 @@ type server struct {
evmPremiumLegacyPacker ReportPacker
jsonPacker ReportPacker

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitConcurrentTransmitGauge prometheus.Gauge
transmitConcurrentDeleteGauge prometheus.Gauge

transmitThreadBusyCount atomic.Int32
deleteThreadBusyCount atomic.Int32
Expand Down Expand Up @@ -130,6 +148,8 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
promTransmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentDeleteGauge.WithLabelValues(donIDStr, serverURL),
atomic.Int32{},
atomic.Int32{},
}
Expand Down Expand Up @@ -161,7 +181,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
select {
case hash := <-s.deleteQueue:
for {
s.deleteThreadBusyCount.Add(1)
s.deleteThreadBusyCountInc()
if err := s.pm.orm.Delete(ctx, [][32]byte{hash}); err != nil {
s.lggr.Errorw("Failed to delete transmission record", "err", err, "transmissionHash", hash)
s.transmitQueueDeleteErrorCount.Inc()
Expand All @@ -170,7 +190,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
// Wait a backoff duration before trying to delete again
continue
case <-stopCh:
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
// abort and return immediately on stop even if items remain in queue
return
}
Expand All @@ -179,14 +199,31 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
}
// success
b.Reset()
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
case <-stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
}

func (s *server) transmitThreadBusyCountInc() {
val := s.transmitThreadBusyCount.Add(1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) transmitThreadBusyCountDec() {
val := s.transmitThreadBusyCount.Add(-1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountInc() {
val := s.deleteThreadBusyCount.Add(1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountDec() {
val := s.deleteThreadBusyCount.Add(-1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}

func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donIDStr string) {
defer wg.Done()
// Exponential backoff with very short retry interval (since latency is a priority)
Expand All @@ -208,8 +245,8 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
return false
}

s.transmitThreadBusyCount.Add(1)
defer s.transmitThreadBusyCount.Add(-1)
s.transmitThreadBusyCountInc()
defer s.transmitThreadBusyCountDec()

req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) {
ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout))
Expand Down
Loading

0 comments on commit ab39c5b

Please sign in to comment.