Skip to content

Commit

Permalink
plugins/cmd/chainlink-example-relay: add example relay
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Jan 16, 2025
1 parent f9cc514 commit 1852089
Show file tree
Hide file tree
Showing 23 changed files with 120 additions and 373 deletions.
2 changes: 1 addition & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G

mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox"))

loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Database(), cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex)

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down
2 changes: 1 addition & 1 deletion core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ type dbConfig interface {
MaxOpenConns() int
MaxIdleConns() int
URL() url.URL
Dialect() pgcommon.DialectName
Dialect() string
}

func newConnection(ctx context.Context, cfg dbConfig) (*sqlx.DB, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
func genTestEVMRelayers(t *testing.T, opts legacyevm.ChainRelayOpts, ks evmrelayer.CSAETHKeystore) *chainlink.CoreRelayerChainInteroperators {
f := chainlink.RelayerFactory{
Logger: opts.Logger,
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing(), opts.AppConfig.Telemetry(), nil, ""),
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Database(), opts.AppConfig.Tracing(), opts.AppConfig.Telemetry(), nil, ""),
CapabilitiesRegistry: capabilities.NewRegistry(opts.Logger),
}

Expand Down
4 changes: 2 additions & 2 deletions core/cmd/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func TestNewUserCache(t *testing.T) {

func TestSetupSolanaRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, "")
reg := plugins.NewTestLoopRegistry(lggr)
ks := mocks.NewSolana(t)
ds := sqltest.NewNoOpDataSource()

Expand Down Expand Up @@ -483,7 +483,7 @@ func TestSetupSolanaRelayer(t *testing.T) {

func TestSetupStarkNetRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, "")
reg := plugins.NewTestLoopRegistry(lggr)
ks := mocks.NewStarkNet(t)
// config 3 chains but only enable 2 => should only be 2 relayer
nEnabledChains := 2
Expand Down
4 changes: 1 addition & 3 deletions core/config/database_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package config
import (
"net/url"
"time"

pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
)

type Backup interface {
Expand Down Expand Up @@ -35,7 +33,7 @@ type Database interface {
DefaultIdleInTxSessionTimeout() time.Duration
DefaultLockTimeout() time.Duration
DefaultQueryTimeout() time.Duration
Dialect() pgcommon.DialectName
Dialect() string
LogSQL() bool
MaxIdleConns() int
MaxOpenConns() int
Expand Down
4 changes: 1 addition & 3 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
ocrcommontypes "github.com/smartcontractkit/libocr/commontypes"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"

"github.com/smartcontractkit/chainlink/v2/core/build"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
Expand Down Expand Up @@ -338,7 +336,7 @@ type Database struct {
DefaultIdleInTxSessionTimeout *commonconfig.Duration
DefaultLockTimeout *commonconfig.Duration
DefaultQueryTimeout *commonconfig.Duration
Dialect pgcommon.DialectName `toml:"-"`
Dialect string `toml:"-"`
LogQueries *bool
MaxIdleConns *int64
MaxOpenConns *int64
Expand Down
4 changes: 2 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
keyStore := keystore.NewInMemory(ds, utils.FastScryptParams, lggr)

mailMon := mailbox.NewMonitor(cfg.AppID().String(), lggr.Named("Mailbox"))
loopRegistry := plugins.NewLoopRegistry(lggr, nil, nil, nil, "")
loopRegistry := plugins.NewTestLoopRegistry(lggr)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down Expand Up @@ -498,7 +498,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
RestrictedHTTPClient: c,
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, ""),
LoopRegistry: plugins.NewTestLoopRegistry(lggr),
MercuryPool: mercuryPool,
CapabilitiesRegistry: capabilitiesRegistry,
CapabilitiesDispatcher: dispatcher,
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
if err != nil {
return nil, fmt.Errorf("could not build Beholder auth: %w", err)
}
loopRegistry = plugins.NewLoopRegistry(globalLogger, opts.Config.Tracing(), opts.Config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
loopRegistry = plugins.NewLoopRegistry(globalLogger, opts.Config.Database(), opts.Config.Tracing(), opts.Config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
}

// If the audit logger is enabled
Expand Down
4 changes: 1 addition & 3 deletions core/services/chainlink/config_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"net/url"
"time"

pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
)
Expand Down Expand Up @@ -113,7 +111,7 @@ func (d *databaseConfig) URL() url.URL {
return *d.s.URL.URL()
}

func (d *databaseConfig) Dialect() pgcommon.DialectName {
func (d *databaseConfig) Dialect() string {
return d.c.Dialect
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCoreRelayerChainInteroperators(t *testing.T) {

factory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, ""),
LoopRegistry: plugins.NewTestLoopRegistry(lggr),
GRPCOpts: loop.GRPCOpts{},
CapabilitiesRegistry: capabilities.NewRegistry(lggr),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func setupNodeCCIP(
beholderAuthHeaders, csaPubKeyHex, err := keystore.BuildBeholderAuth(keyStore)
require.NoError(t, err)

loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Database(), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: loopRegistry,
Expand Down Expand Up @@ -494,7 +494,7 @@ func setupNodeCCIP(
RestrictedHTTPClient: &http.Client{},
AuditLogger: audit.NoopLogger,
MailMon: mailMon,
LoopRegistry: plugins.NewLoopRegistry(lggr, config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex),
LoopRegistry: plugins.NewLoopRegistry(lggr, config.Database(), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex),
})
require.NoError(t, err)
require.NoError(t, app.GetKeyStore().Unlock(ctx, "password"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func setupNodeCCIP(
},
CSAETHKeystore: simEthKeyStore,
}
loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Tracing(), config.Telemetry(), nil, "")
loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Database(), config.Tracing(), config.Telemetry(), nil, "")
relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: loopRegistry,
Expand Down Expand Up @@ -490,7 +490,7 @@ func setupNodeCCIP(
RestrictedHTTPClient: &http.Client{},
AuditLogger: audit.NoopLogger,
MailMon: mailMon,
LoopRegistry: plugins.NewLoopRegistry(lggr, config.Tracing(), config.Telemetry(), nil, ""),
LoopRegistry: plugins.NewLoopRegistry(lggr, config.Database(), config.Tracing(), config.Telemetry(), nil, ""),
})
ctx := testutils.Context(t)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/mercury/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestNewServices(t *testing.T) {

t.Run("restartable loop", func(t *testing.T) {
// setup a real loop registry to test restartability
registry := plugins.NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "")
registry := plugins.NewTestLoopRegistry(logger.TestLogger(t))
loopRegistrarConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, registry.Register, registry.Unregister)
prodCfg := mercuryocr2.NewMercuryConfig(1, 1, loopRegistrarConfig)
type args struct {
Expand Down
96 changes: 19 additions & 77 deletions core/services/pg/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,20 @@ package pg

import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"os"
"time"

"github.com/XSAM/otelsql"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection
"github.com/jmoiron/sqlx"
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest"
)

// NOTE: This is the default level in Postgres anyway, we just make it
// explicit here
const defaultIsolation = sql.LevelReadCommitted

var MinRequiredPGVersion = 110000

func init() {
Expand All @@ -51,81 +41,33 @@ type ConnectionConfig interface {
MaxIdleConns() int
}

func NewConnection(ctx context.Context, uri string, dialect pgcommon.DialectName, config ConnectionConfig) (*sqlx.DB, error) {
opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
otelsql.WithSpanOptions(otelsql.SpanOptions{
OmitConnResetSession: true,
OmitConnPrepare: true,
OmitRows: true,
OmitConnectorConnect: true,
OmitConnQuery: false,
})}

// Set default connection options
lockTimeout := config.DefaultLockTimeout().Milliseconds()
idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds()
connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
lockTimeout, idleInTxSessionTimeout, defaultIsolation.String())

var sqldb *sql.DB
if dialect == pgcommon.TransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register

err := pgcommon.RegisterTxDb(uri)
if err != nil {
return nil, fmt.Errorf("failed to register txdb: %w", err)
}
sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to open txdb: %w", err)
}
_, err = sqldb.ExecContext(ctx, connParams)
if err != nil {
return nil, fmt.Errorf("failed to set options: %w", err)
}
} else {
// Set sane defaults for every new database connection.
// Those can be overridden with Txn options or SET statements in individual connections.
// The default values are the same for Txns.
connConfig, err := pgx.ParseConfig(uri)
if err != nil {
return nil, fmt.Errorf("database: failed to parse config: %w", err)
func NewConnection(ctx context.Context, uri string, driverName string, config ConnectionConfig) (db *sqlx.DB, err error) {
if driverName == commonpg.DriverTxWrappedPostgres {
if err = sqltest.RegisterTxDB(uri); err != nil {
return nil, fmt.Errorf("failed to register %s: %w", commonpg.DriverTxWrappedPostgres, err)
}

connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) {
_, err = c.Exec(ctx, connParams)
return
}))

// Initialize sql/sqlx
sqldb = otelsql.OpenDB(connector, opts...)
}
db := sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)

setMaxConns(db, config)
db, err = commonpg.DBConfig{
IdleInTxSessionTimeout: config.DefaultIdleInTxSessionTimeout(),
LockTimeout: config.DefaultLockTimeout(),
MaxOpenConns: config.MaxOpenConns(),
MaxIdleConns: config.MaxIdleConns(),
}.New(ctx, uri, driverName)
if err != nil {
return nil, err
}
setMaxMercuryConns(db, config)

if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" {
if err := checkVersion(db, MinRequiredPGVersion); err != nil {
return nil, err
}
}

return db, disallowReplica(db)
return db, nil
}

func setMaxConns(db *sqlx.DB, config ConnectionConfig) {
db.SetMaxOpenConns(config.MaxOpenConns())
db.SetMaxIdleConns(config.MaxIdleConns())

func setMaxMercuryConns(db *sqlx.DB, config ConnectionConfig) {
// HACK: In the case of mercury jobs, one conn is needed per job for good
// performance. Most nops will forget to increase the defaults to account
// for this so we detect it here instead.
Expand Down
8 changes: 3 additions & 5 deletions core/services/pg/locked_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/static"
Expand All @@ -29,7 +27,7 @@ type LockedDBConfig interface {
ConnectionConfig
URL() url.URL
DefaultQueryTimeout() time.Duration
Dialect() pg.DialectName
Dialect() string
}

type lockedDb struct {
Expand Down Expand Up @@ -73,14 +71,14 @@ func (l *lockedDb) Open(ctx context.Context) (err error) {
}
revert := func() {
// Let Open() return the actual error, while l.Close() error is just logged.
if err2 := l.close(); err2 != nil {
if err2 := l.Close(); err2 != nil {
l.lggr.Errorf("failed to cleanup LockedDB: %v", err2)
}
}

// Step 2: start the stat reporter
l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr)
l.statsReporter.Start(ctx)
l.statsReporter.Start()

// Step 3: acquire DB locks
lockingMode := l.lockCfg.LockingMode()
Expand Down
Loading

0 comments on commit 1852089

Please sign in to comment.