Skip to content

Commit

Permalink
Merge branch 'develop' into mk/part2/ccip-4524
Browse files Browse the repository at this point in the history
  • Loading branch information
makramkd committed Dec 12, 2024
2 parents 483542f + 52c2db4 commit 39a6997
Show file tree
Hide file tree
Showing 66 changed files with 1,307 additions and 1,146 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix non-idempotent loopp registry.Register
5 changes: 5 additions & 0 deletions .changeset/giant-eels-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add error handling for Arbitrum RPC server timeouts. #added
9 changes: 5 additions & 4 deletions .github/workflows/ci-core-partial.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
permissions:
id-token: write
contents: write
actions: write
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -86,7 +87,7 @@ jobs:
go-mod-download-directory: ${{ matrix.type.test-suite == 'ccip-deployment' && matrix.type.module-directory || '' }}

- name: Build Tests
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
timeout-minutes: 10
with:
pipeline-step: "build"
Expand All @@ -98,7 +99,7 @@ jobs:
build-flags: ${{ matrix.type.build-flags }}

- name: Run Tests
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
timeout-minutes: 15
env:
CL_DATABASE_URL: ${{ env.DB_URL }}
Expand All @@ -112,7 +113,7 @@ jobs:
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Update Test Index
uses: smartcontractkit/.github/apps/go-conditional-tests@37882e110590e636627a26371bdbd56ddfcce821 # go-conditional-tests@0.1.0
uses: smartcontractkit/.github/apps/go-conditional-tests@57f99fbea73056c490c766d50ef582a13ec4f3bb # go-conditional-tests@0.2.0
with:
pipeline-step: "update"
collect-coverage: ${{ needs.filter.outputs.should-collect-coverage }}
Expand All @@ -130,7 +131,7 @@ jobs:
if: ${{ needs.filter.outputs.should-collect-coverage == 'true' }}
runs-on: ubuntu-latest
steps:
- name: Checkout the repo
- name: Checkout the repo
uses: actions/[email protected]
with:
# fetches all history for all tags and branches to provide more metadata for sonar reports
Expand Down
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,6 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
Expand Down
15 changes: 13 additions & 2 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
ServiceUnavailable
TerminallyStuck
TooManyResults
ServiceTimeout
)

type ClientErrors map[int]*regexp.Regexp
Expand Down Expand Up @@ -160,7 +161,8 @@ var arbitrum = ClientErrors{
Fatal: arbitrumFatal,
L2FeeTooLow: regexp.MustCompile(`(: |^)max fee per gas less than block base fee(:|$)`),
L2Full: regexp.MustCompile(`(: |^)(queue full|sequencer pending tx pool full, please try again)(:|$)`),
ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout`),
ServiceUnavailable: regexp.MustCompile(`(: |^)502 Bad Gateway: [\s\S]*$|network is unreachable|i/o timeout|(: |^)503 Service Temporarily Unavailable(:|$)`),
ServiceTimeout: regexp.MustCompile(`(: |^)408 Request Timeout(:|$)`),
}

// Treasure
Expand Down Expand Up @@ -398,6 +400,11 @@ func (s *SendError) IsServiceUnavailable(configErrors *ClientErrors) bool {
return s.is(ServiceUnavailable, configErrors) || pkgerrors.Is(s.err, commonclient.ErroringNodeError)
}

// IsServiceTimeout indicates if the error was caused by a service timeout
func (s *SendError) IsServiceTimeout(configErrors *ClientErrors) bool {
return s.is(ServiceTimeout, configErrors)
}

// IsTerminallyStuck indicates if a transaction was stuck without any chance of inclusion
func (s *SendError) IsTerminallyStuckConfigError(configErrors *ClientErrors) bool {
return s.is(TerminallyStuck, configErrors)
Expand Down Expand Up @@ -619,6 +626,10 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
lggr.Errorw(fmt.Sprintf("service unavailable while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsServiceTimeout(configErrors) {
lggr.Errorw(fmt.Sprintf("service timed out while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
}
if sendError.IsTimeout() {
lggr.Errorw(fmt.Sprintf("timeout while sending transaction %x", tx.Hash()), "err", sendError, "etx", tx)
return commonclient.Retryable
Expand Down Expand Up @@ -666,7 +677,7 @@ var drpc = ClientErrors{

// Linkpool, Blockdaemon, and Chainstack all return "request timed out" if the log results are too large for them to process
var defaultClient = ClientErrors{
TooManyResults: regexp.MustCompile(`request timed out`),
TooManyResults: regexp.MustCompile(`request timed out|408 Request Timed Out`),
}

// JSON-RPC error codes which can indicate a refusal of the server to process an eth_getLogs request because the result set is too large
Expand Down
15 changes: 15 additions & 0 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func Test_Eth_Errors(t *testing.T) {
{"network is unreachable", true, "Arbitrum"},
{"client error service unavailable", true, "tomlConfig"},
{"[Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Error invoking RPC: [Request ID: 825608a8-fd8a-4b5b-aea7-92999509306d] Transaction execution returns a null value for transaction", true, "hedera"},
{"call failed: 503 Service Temporarily Unavailable: <html>\r\n<head><title>503 Service Temporarily Unavailable</title></head>\r\n<body>\r\n<center><h1>503 Service Temporarily Unavailable</h1></center>\r\n</body>\r\n</html>\r\n", true, "Arbitrum"},
}
for _, test := range tests {
err = evmclient.NewSendErrorS(test.message)
Expand All @@ -260,6 +261,20 @@ func Test_Eth_Errors(t *testing.T) {
}
})

t.Run("IsServiceTimeout", func(t *testing.T) {
tests := []errorCase{
{"call failed: 408 Request Timeout: {", true, "Arbitrum"},
{"408 Request Timeout: {\"id\":303,\"jsonrpc\":\"2.0\",\"error\":{\"code\\\":-32009,\\\"message\\\":\\\"request timeout\\\"}}\",\"errVerbose\":\"408 Request Timeout:\n", true, "Arbitrum"},
{"request timeout", false, "tomlConfig"},
}
for _, test := range tests {
err = evmclient.NewSendErrorS(test.message)
assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect)
err = newSendErrorWrapped(test.message)
assert.Equal(t, err.IsServiceTimeout(clientErrors), test.expect)
}
})

t.Run("IsTxFeeExceedsCap", func(t *testing.T) {
tests := []errorCase{
{"tx fee (1.10 ether) exceeds the configured cap (1.00 ether)", true, "geth"},
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0 h1:/1L+v4SxUD2K5RMRbfByyLfePMAgQKeD0onSetPnGmA=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241211150100-7683331f64a0/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83 h1:NjrU7KOn3Tk+C6QFo9tQBqeotPKytpBwhn/J1s+yiiY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210192653-a9c706f99e83/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49 h1:ZA92CTX9JtEArrxgZw7PNctVxFS+/DmSXumkwf1WiMY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241212163958-6a43e61b9d49/go.mod h1:bQktEJf7sJ0U3SmIcXvbGUox7SmXcnSEZ4kUbT8R5Nk=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241202141438-a90db35252db h1:N1RH1hSr2ACzOFc9hkCcjE8pRBTdcU3p8nsTJByaLes=
Expand Down
89 changes: 60 additions & 29 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mercury

import (
"context"
"encoding/json"
"fmt"
"os/exec"
Expand Down Expand Up @@ -79,14 +80,13 @@ func NewServices(
return nil, errors.New("expected job to have a non-nil PipelineSpec")
}

var err error
var pluginConfig config.PluginConfig
if len(jb.OCR2OracleSpec.PluginConfig) == 0 {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
} else {
err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -101,8 +101,8 @@ func NewServices(
// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if err = services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", err)
if cerr := services.MultiCloser(srvs).Close(); cerr != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
Expand All @@ -112,6 +112,7 @@ func NewServices(
var (
factory ocr3types.MercuryPluginFactory
factoryServices []job.ServiceCtx
fErr error
)
fCfg := factoryCfg{
orm: orm,
Expand All @@ -127,31 +128,31 @@ func NewServices(
}
switch feedID.Version() {
case 1:
factory, factoryServices, err = newv1factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv1factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 2:
factory, factoryServices, err = newv2factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv2factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 3:
factory, factoryServices, err = newv3factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv3factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 4:
factory, factoryServices, err = newv4factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv4factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
default:
Expand Down Expand Up @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loop mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand All @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) {
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
}
loopID := mercuryLggr.Name()
cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
ID: loopID,
Cmd: cmd,
Env: envVars,
})
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
}
return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil
}

// loopUnregisterCloser is a helper to unregister a loop
// as a service
// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern
// perhaps it can be implemented in the delegate on job delete.
type loopUnregisterCloser struct {
r plugins.RegistrarConfig
id string
}

func (l *loopUnregisterCloser) Close() error {
l.r.UnregisterLOOP(l.id)
return nil
}

func (l *loopUnregisterCloser) Start(ctx context.Context) error {
return nil
}

func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser {
return &loopUnregisterCloser{
r: r,
id: id,
}
return cmdFn, opts, mercuryLggr, nil
}
Loading

0 comments on commit 39a6997

Please sign in to comment.