diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 09eacd7dc39..731b85f8b0e 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -229,7 +229,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox")) - loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex) + loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Database(), cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex) mercuryPool := wsrpc.NewPool(appLggr, cache.Config{ LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(), diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 07b593ac978..05486731bbb 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -1098,7 +1098,7 @@ type dbConfig interface { MaxOpenConns() int MaxIdleConns() int URL() url.URL - Dialect() pgcommon.DialectName + Dialect() string } func newConnection(ctx context.Context, cfg dbConfig) (*sqlx.DB, error) { diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 5c9449b2107..6fa868d2b78 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -46,7 +46,7 @@ import ( func genTestEVMRelayers(t *testing.T, opts legacyevm.ChainRelayOpts, ks evmrelayer.CSAETHKeystore) *chainlink.CoreRelayerChainInteroperators { f := chainlink.RelayerFactory{ Logger: opts.Logger, - LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing(), opts.AppConfig.Telemetry(), nil, ""), + LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Database(), opts.AppConfig.Tracing(), opts.AppConfig.Telemetry(), nil, ""), CapabilitiesRegistry: capabilities.NewRegistry(opts.Logger), } diff --git a/core/cmd/shell_test.go b/core/cmd/shell_test.go index e73e1d51f24..decb092a249 100644 --- a/core/cmd/shell_test.go +++ b/core/cmd/shell_test.go @@ -353,7 +353,7 @@ func TestNewUserCache(t *testing.T) { func TestSetupSolanaRelayer(t *testing.T) { lggr := logger.TestLogger(t) - reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, "") + reg := plugins.NewTestLoopRegistry(lggr) ks := mocks.NewSolana(t) ds := sqltest.NewNoOpDataSource() @@ -483,7 +483,7 @@ func TestSetupSolanaRelayer(t *testing.T) { func TestSetupStarkNetRelayer(t *testing.T) { lggr := logger.TestLogger(t) - reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, "") + reg := plugins.NewTestLoopRegistry(lggr) ks := mocks.NewStarkNet(t) // config 3 chains but only enable 2 => should only be 2 relayer nEnabledChains := 2 diff --git a/core/config/database_config.go b/core/config/database_config.go index 56f8f8165d4..20183779d34 100644 --- a/core/config/database_config.go +++ b/core/config/database_config.go @@ -3,8 +3,6 @@ package config import ( "net/url" "time" - - pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" ) type Backup interface { @@ -35,7 +33,7 @@ type Database interface { DefaultIdleInTxSessionTimeout() time.Duration DefaultLockTimeout() time.Duration DefaultQueryTimeout() time.Duration - Dialect() pgcommon.DialectName + Dialect() string LogSQL() bool MaxIdleConns() int MaxOpenConns() int diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 620f7d96eee..38019df3f01 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -16,8 +16,6 @@ import ( ocrcommontypes "github.com/smartcontractkit/libocr/commontypes" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" - pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" - "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/config" @@ -338,7 +336,7 @@ type Database struct { DefaultIdleInTxSessionTimeout *commonconfig.Duration DefaultLockTimeout *commonconfig.Duration DefaultQueryTimeout *commonconfig.Duration - Dialect pgcommon.DialectName `toml:"-"` + Dialect string `toml:"-"` LogQueries *bool MaxIdleConns *int64 MaxOpenConns *int64 diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 5780410ab60..3941b765f9d 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -396,7 +396,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn keyStore := keystore.NewInMemory(ds, utils.FastScryptParams, lggr) mailMon := mailbox.NewMonitor(cfg.AppID().String(), lggr.Named("Mailbox")) - loopRegistry := plugins.NewLoopRegistry(lggr, nil, nil, nil, "") + loopRegistry := plugins.NewTestLoopRegistry(lggr) mercuryPool := wsrpc.NewPool(lggr, cache.Config{ LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(), @@ -498,7 +498,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn RestrictedHTTPClient: c, UnrestrictedHTTPClient: c, SecretGenerator: MockSecretGenerator{}, - LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, ""), + LoopRegistry: plugins.NewTestLoopRegistry(lggr), MercuryPool: mercuryPool, CapabilitiesRegistry: capabilitiesRegistry, CapabilitiesDispatcher: dispatcher, diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 06289b13fd4..2ca09c1b733 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -402,7 +402,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { if err != nil { return nil, fmt.Errorf("could not build Beholder auth: %w", err) } - loopRegistry = plugins.NewLoopRegistry(globalLogger, opts.Config.Tracing(), opts.Config.Telemetry(), beholderAuthHeaders, csaPubKeyHex) + loopRegistry = plugins.NewLoopRegistry(globalLogger, opts.Config.Database(), opts.Config.Tracing(), opts.Config.Telemetry(), beholderAuthHeaders, csaPubKeyHex) } // If the audit logger is enabled diff --git a/core/services/chainlink/config_database.go b/core/services/chainlink/config_database.go index 27e61479146..7fc12aabf35 100644 --- a/core/services/chainlink/config_database.go +++ b/core/services/chainlink/config_database.go @@ -4,8 +4,6 @@ import ( "net/url" "time" - pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" - "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/config/toml" ) @@ -113,7 +111,7 @@ func (d *databaseConfig) URL() url.URL { return *d.s.URL.URL() } -func (d *databaseConfig) Dialect() pgcommon.DialectName { +func (d *databaseConfig) Dialect() string { return d.c.Dialect } diff --git a/core/services/chainlink/relayer_chain_interoperators_test.go b/core/services/chainlink/relayer_chain_interoperators_test.go index f03e172542c..4c7d1146ee0 100644 --- a/core/services/chainlink/relayer_chain_interoperators_test.go +++ b/core/services/chainlink/relayer_chain_interoperators_test.go @@ -176,7 +176,7 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { factory := chainlink.RelayerFactory{ Logger: lggr, - LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, ""), + LoopRegistry: plugins.NewTestLoopRegistry(lggr), GRPCOpts: loop.GRPCOpts{}, CapabilitiesRegistry: capabilities.NewRegistry(lggr), } diff --git a/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go b/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go index b1c3677e497..7ffb0f24220 100644 --- a/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go +++ b/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go @@ -464,7 +464,7 @@ func setupNodeCCIP( beholderAuthHeaders, csaPubKeyHex, err := keystore.BuildBeholderAuth(keyStore) require.NoError(t, err) - loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex) + loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Database(), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex) relayerFactory := chainlink.RelayerFactory{ Logger: lggr, LoopRegistry: loopRegistry, @@ -494,7 +494,7 @@ func setupNodeCCIP( RestrictedHTTPClient: &http.Client{}, AuditLogger: audit.NoopLogger, MailMon: mailMon, - LoopRegistry: plugins.NewLoopRegistry(lggr, config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex), + LoopRegistry: plugins.NewLoopRegistry(lggr, config.Database(), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex), }) require.NoError(t, err) require.NoError(t, app.GetKeyStore().Unlock(ctx, "password")) diff --git a/core/services/ocr2/plugins/ccip/testhelpers/testhelpers_1_4_0/chainlink.go b/core/services/ocr2/plugins/ccip/testhelpers/testhelpers_1_4_0/chainlink.go index cec58136b5c..91592000420 100644 --- a/core/services/ocr2/plugins/ccip/testhelpers/testhelpers_1_4_0/chainlink.go +++ b/core/services/ocr2/plugins/ccip/testhelpers/testhelpers_1_4_0/chainlink.go @@ -460,7 +460,7 @@ func setupNodeCCIP( }, CSAETHKeystore: simEthKeyStore, } - loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Tracing(), config.Telemetry(), nil, "") + loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Database(), config.Tracing(), config.Telemetry(), nil, "") relayerFactory := chainlink.RelayerFactory{ Logger: lggr, LoopRegistry: loopRegistry, @@ -490,7 +490,7 @@ func setupNodeCCIP( RestrictedHTTPClient: &http.Client{}, AuditLogger: audit.NoopLogger, MailMon: mailMon, - LoopRegistry: plugins.NewLoopRegistry(lggr, config.Tracing(), config.Telemetry(), nil, ""), + LoopRegistry: plugins.NewLoopRegistry(lggr, config.Database(), config.Tracing(), config.Telemetry(), nil, ""), }) ctx := testutils.Context(t) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index eb67da53100..71cfabce303 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -253,7 +253,7 @@ func TestNewServices(t *testing.T) { t.Run("restartable loop", func(t *testing.T) { // setup a real loop registry to test restartability - registry := plugins.NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "") + registry := plugins.NewTestLoopRegistry(logger.TestLogger(t)) loopRegistrarConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, registry.Register, registry.Unregister) prodCfg := mercuryocr2.NewMercuryConfig(1, 1, loopRegistrarConfig) type args struct { diff --git a/core/services/pg/connection.go b/core/services/pg/connection.go index bf3663e82ce..d9c4a21b279 100644 --- a/core/services/pg/connection.go +++ b/core/services/pg/connection.go @@ -2,30 +2,20 @@ package pg import ( "context" - "database/sql" "errors" "fmt" "log" "os" "time" - "github.com/XSAM/otelsql" - "github.com/google/uuid" "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" + _ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection "github.com/jmoiron/sqlx" - "github.com/scylladb/go-reflectx" - "go.opentelemetry.io/otel" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" + commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest" ) -// NOTE: This is the default level in Postgres anyway, we just make it -// explicit here -const defaultIsolation = sql.LevelReadCommitted - var MinRequiredPGVersion = 110000 func init() { @@ -51,67 +41,22 @@ type ConnectionConfig interface { MaxIdleConns() int } -func NewConnection(ctx context.Context, uri string, dialect pgcommon.DialectName, config ConnectionConfig) (*sqlx.DB, error) { - opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL), - otelsql.WithTracerProvider(otel.GetTracerProvider()), - otelsql.WithSQLCommenter(true), - otelsql.WithSpanOptions(otelsql.SpanOptions{ - OmitConnResetSession: true, - OmitConnPrepare: true, - OmitRows: true, - OmitConnectorConnect: true, - OmitConnQuery: false, - })} - - // Set default connection options - lockTimeout := config.DefaultLockTimeout().Milliseconds() - idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds() - connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`, - lockTimeout, idleInTxSessionTimeout, defaultIsolation.String()) - - var sqldb *sql.DB - if dialect == pgcommon.TransactionWrappedPostgres { - // Dbtx uses the uri as a unique identifier for each transaction. Each ORM - // should be encapsulated in it's own transaction, and thus needs its own - // unique id. - // - // We can happily throw away the original uri here because if we are using - // txdb it should have already been set at the point where we called - // txdb.Register - - err := pgcommon.RegisterTxDb(uri) - if err != nil { - return nil, fmt.Errorf("failed to register txdb: %w", err) - } - sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...) - if err != nil { - return nil, fmt.Errorf("failed to open txdb: %w", err) - } - _, err = sqldb.ExecContext(ctx, connParams) - if err != nil { - return nil, fmt.Errorf("failed to set options: %w", err) - } - } else { - // Set sane defaults for every new database connection. - // Those can be overridden with Txn options or SET statements in individual connections. - // The default values are the same for Txns. - connConfig, err := pgx.ParseConfig(uri) - if err != nil { - return nil, fmt.Errorf("database: failed to parse config: %w", err) +func NewConnection(ctx context.Context, uri string, driverName string, config ConnectionConfig) (db *sqlx.DB, err error) { + if driverName == commonpg.DriverTxWrappedPostgres { + if err = sqltest.RegisterTxDB(uri); err != nil { + return nil, fmt.Errorf("failed to register %s: %w", commonpg.DriverTxWrappedPostgres, err) } - - connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) { - _, err = c.Exec(ctx, connParams) - return - })) - - // Initialize sql/sqlx - sqldb = otelsql.OpenDB(connector, opts...) } - db := sqlx.NewDb(sqldb, string(dialect)) - db.MapperFunc(reflectx.CamelToSnakeASCII) - - setMaxConns(db, config) + db, err = commonpg.DBConfig{ + IdleInTxSessionTimeout: config.DefaultIdleInTxSessionTimeout(), + LockTimeout: config.DefaultLockTimeout(), + MaxOpenConns: config.MaxOpenConns(), + MaxIdleConns: config.MaxIdleConns(), + }.New(ctx, uri, driverName) + if err != nil { + return nil, err + } + setMaxMercuryConns(db, config) if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" { if err := checkVersion(db, MinRequiredPGVersion); err != nil { @@ -119,13 +64,10 @@ func NewConnection(ctx context.Context, uri string, dialect pgcommon.DialectName } } - return db, disallowReplica(db) + return db, nil } -func setMaxConns(db *sqlx.DB, config ConnectionConfig) { - db.SetMaxOpenConns(config.MaxOpenConns()) - db.SetMaxIdleConns(config.MaxIdleConns()) - +func setMaxMercuryConns(db *sqlx.DB, config ConnectionConfig) { // HACK: In the case of mercury jobs, one conn is needed per job for good // performance. Most nops will forget to increase the defaults to account // for this so we detect it here instead. diff --git a/core/services/pg/locked_db.go b/core/services/pg/locked_db.go index baea01b43a5..ebdd0786b38 100644 --- a/core/services/pg/locked_db.go +++ b/core/services/pg/locked_db.go @@ -11,8 +11,6 @@ import ( "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" - "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/static" @@ -29,7 +27,7 @@ type LockedDBConfig interface { ConnectionConfig URL() url.URL DefaultQueryTimeout() time.Duration - Dialect() pg.DialectName + Dialect() string } type lockedDb struct { @@ -73,14 +71,14 @@ func (l *lockedDb) Open(ctx context.Context) (err error) { } revert := func() { // Let Open() return the actual error, while l.Close() error is just logged. - if err2 := l.close(); err2 != nil { + if err2 := l.Close(); err2 != nil { l.lggr.Errorf("failed to cleanup LockedDB: %v", err2) } } // Step 2: start the stat reporter l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr) - l.statsReporter.Start(ctx) + l.statsReporter.Start() // Step 3: acquire DB locks lockingMode := l.lockCfg.LockingMode() diff --git a/core/services/pg/stats.go b/core/services/pg/stats.go index b8b1ed68401..3aa40580a90 100644 --- a/core/services/pg/stats.go +++ b/core/services/pg/stats.go @@ -1,132 +1,17 @@ package pg import ( - "context" - "database/sql" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - + commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" "github.com/smartcontractkit/chainlink/v2/core/logger" ) -const dbStatsInternal = 10 * time.Second - -var ( - promDBConnsMax = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_conns_max", - Help: "Maximum number of open connections to the database.", - }) - promDBConnsOpen = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_conns_open", - Help: "The number of established connections both in use and idle.", - }) - promDBConnsInUse = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_conns_used", - Help: "The number of connections currently in use.", - }) - promDBWaitCount = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_wait_count", - Help: "The total number of connections waited for.", - }) - promDBWaitDuration = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_wait_time_seconds", - Help: "The total time blocked waiting for a new connection.", - }) -) - -func publishStats(stats sql.DBStats) { - promDBConnsMax.Set(float64(stats.MaxOpenConnections)) - promDBConnsOpen.Set(float64(stats.OpenConnections)) - promDBConnsInUse.Set(float64(stats.InUse)) - - promDBWaitCount.Set(float64(stats.WaitCount)) - promDBWaitDuration.Set(stats.WaitDuration.Seconds()) -} - -type StatsReporterOpt func(*StatsReporter) - -func StatsInterval(d time.Duration) StatsReporterOpt { - return func(r *StatsReporter) { - r.interval = d - } -} - -func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt { - return func(r *StatsReporter) { - r.reportFn = fn - } -} - type ( - StatFn func() sql.DBStats - ReportFn func(sql.DBStats) + StatFn = commonpg.StatFn + ReportFn = commonpg.ReportFn ) -type StatsReporter struct { - statFn StatFn - reportFn ReportFn - interval time.Duration - cancel context.CancelFunc - lggr logger.Logger - once sync.Once - wg sync.WaitGroup -} - -func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter { - r := &StatsReporter{ - statFn: fn, - reportFn: publishStats, - interval: dbStatsInternal, - lggr: lggr.Named("StatsReporter"), - } - - for _, opt := range opts { - opt(r) - } - - return r -} - -func (r *StatsReporter) Start(ctx context.Context) { - startOnce := func() { - r.wg.Add(1) - r.lggr.Debug("Starting DB stat reporter") - rctx, cancelFunc := context.WithCancel(ctx) - r.cancel = cancelFunc - go r.loop(rctx) - } - - r.once.Do(startOnce) -} - -// Stop stops all resources owned by the reporter and waits -// for all of them to be done -func (r *StatsReporter) Stop() { - if r.cancel != nil { - r.lggr.Debug("Stopping DB stat reporter") - r.cancel() - r.cancel = nil - r.wg.Wait() - } -} - -func (r *StatsReporter) loop(ctx context.Context) { - defer r.wg.Done() - - ticker := time.NewTicker(r.interval) - defer ticker.Stop() +type StatsReporter = commonpg.StatsReporter - r.reportFn(r.statFn()) - for { - select { - case <-ticker.C: - r.reportFn(r.statFn()) - case <-ctx.Done(): - r.lggr.Debug("stat reporter loop received done. stopping...") - return - } - } +func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...commonpg.StatsReporterOpt) *StatsReporter { + return commonpg.NewStatsReporter(fn, lggr, opts...) } diff --git a/core/services/pg/stats_test.go b/core/services/pg/stats_test.go deleted file mode 100644 index 76a8b426fd8..00000000000 --- a/core/services/pg/stats_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package pg - -import ( - "context" - "database/sql" - "strings" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/stretchr/testify/mock" - - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -// testDbStater implements mocks for the function signatures -// needed by the stat reporte wrapper for statFn -type testDbStater struct { - mock.Mock - t *testing.T - name string - testGauge prometheus.Gauge -} - -func newtestDbStater(t *testing.T, name string) *testDbStater { - return &testDbStater{ - t: t, - name: name, - testGauge: promauto.NewGauge(prometheus.GaugeOpts{ - Name: strings.ReplaceAll(name, " ", "_"), - }), - } -} - -func (s *testDbStater) Stats() sql.DBStats { - s.Called() - return sql.DBStats{} -} - -func (s *testDbStater) Report(stats sql.DBStats) { - s.Called() - s.testGauge.Set(float64(stats.MaxOpenConnections)) -} - -type statScenario struct { - name string - testFn func(*testing.T, *StatsReporter, time.Duration, int) -} - -func TestStatReporter(t *testing.T) { - interval := 2 * time.Millisecond - expectedIntervals := 4 - - lggr := logger.TestLogger(t) - - for _, scenario := range []statScenario{ - {name: "parent_ctx_canceled", testFn: testParentContextCanceled}, - {name: "normal_collect_and_stop", testFn: testCollectAndStop}, - {name: "mutli_start", testFn: testMultiStart}, - {name: "multi_stop", testFn: testMultiStop}, - } { - t.Run(scenario.name, func(t *testing.T) { - d := newtestDbStater(t, scenario.name) - d.Mock.On("Stats").Return(sql.DBStats{}) - d.Mock.On("Report").Return() - reporter := NewStatsReporter(d.Stats, - lggr, - StatsInterval(interval), - StatsCustomReporterFn(d.Report), - ) - - scenario.testFn( - t, - reporter, - interval, - expectedIntervals, - ) - - d.AssertCalled(t, "Stats") - d.AssertCalled(t, "Report") - }) - } -} - -// test appropriate handling of context cancellation -func testParentContextCanceled(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - tctx, cancel := context.WithTimeout(ctx, time.Duration(n)*interval) - - r.Start(tctx) - defer r.Stop() - // wait for parent cancelation - <-tctx.Done() - // call cancel to statisy linter - cancel() -} - -// test normal stop -func testCollectAndStop(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - - r.Start(ctx) - time.Sleep(time.Duration(n) * interval) - r.Stop() -} - -// test multiple start calls are idempotent -func testMultiStart(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - - ticker := time.NewTicker(time.Duration(n) * interval) - defer ticker.Stop() - - r.Start(ctx) - r.Start(ctx) - <-ticker.C - r.Stop() -} - -// test multiple stop calls are idempotent -func testMultiStop(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - - ticker := time.NewTicker(time.Duration(n) * interval) - defer ticker.Stop() - - r.Start(ctx) - <-ticker.C - r.Stop() - r.Stop() -} diff --git a/core/static/static.go b/core/static/static.go index f840331bc99..a8a47899802 100644 --- a/core/static/static.go +++ b/core/static/static.go @@ -6,6 +6,8 @@ import ( "time" "github.com/google/uuid" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" ) // Version and Sha are set at compile time via build arguments. @@ -40,17 +42,11 @@ func buildPrettyVersion() string { // SetConsumerName sets a nicely formatted application_name on the // database uri func SetConsumerName(uri *url.URL, name string, id *uuid.UUID) { - q := uri.Query() - applicationName := fmt.Sprintf("Chainlink%s|%s", buildPrettyVersion(), name) if id != nil { applicationName += fmt.Sprintf("|%s", id.String()) } - if len(applicationName) > 63 { - applicationName = applicationName[:63] - } - q.Set("application_name", applicationName) - uri.RawQuery = q.Encode() + pg.SetApplicationName(uri, applicationName) } // Short returns a 7-character sha prefix and version, or Unset if blank. diff --git a/core/web/loop_registry_internal_test.go b/core/web/loop_registry_internal_test.go index d1235cd09b4..fd5d9bee6ba 100644 --- a/core/web/loop_registry_internal_test.go +++ b/core/web/loop_registry_internal_test.go @@ -38,7 +38,7 @@ func TestLoopRegistryServer_CantWriteToResponse(t *testing.T) { l, o := logger.TestLoggerObserved(t, zap.ErrorLevel) s := &LoopRegistryServer{ exposedPromPort: 1, - registry: plugins.NewLoopRegistry(l, nil, nil, nil, ""), + registry: plugins.NewTestLoopRegistry(l), logger: l.(logger.SugaredLogger), jsonMarshalFn: json.Marshal, } @@ -53,7 +53,7 @@ func TestLoopRegistryServer_CantMarshal(t *testing.T) { l, o := logger.TestLoggerObserved(t, zap.ErrorLevel) s := &LoopRegistryServer{ exposedPromPort: 1, - registry: plugins.NewLoopRegistry(l, nil, nil, nil, ""), + registry: plugins.NewTestLoopRegistry(l), logger: l.(logger.SugaredLogger), jsonMarshalFn: func(any) ([]byte, error) { return []byte(""), errors.New("can't unmarshal") diff --git a/go.mod b/go.mod index 1cad28ea09d..e18b7a5898f 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/NethermindEth/juno v0.3.1 github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb - github.com/XSAM/otelsql v0.27.0 + github.com/XSAM/otelsql v0.29.0 github.com/andybalholm/brotli v1.1.1 github.com/avast/retry-go/v4 v4.6.0 github.com/btcsuite/btcd/btcec/v2 v2.3.4 @@ -80,8 +80,8 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20250110181647-9dba278f2103 - github.com/smartcontractkit/chainlink-common v0.4.1-0.20250115094325-4e61572bb9bd - github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e + github.com/smartcontractkit/chainlink-common v0.4.1-0.20250116025035-3ae84ca1456a + github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250116025556-e85fc2ec8b56 github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250113165937-53c02f2513d4 github.com/smartcontractkit/chainlink-feeds v0.1.1 github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20241220173418-09e17ddbeb20 diff --git a/go.sum b/go.sum index 10c6bc0e667..40c9587ab33 100644 --- a/go.sum +++ b/go.sum @@ -133,6 +133,7 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -1156,8 +1157,10 @@ github.com/smartcontractkit/chainlink-ccip v0.0.0-20250110181647-9dba278f2103 h1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20250110181647-9dba278f2103/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= github.com/smartcontractkit/chainlink-common v0.4.1-0.20250115094325-4e61572bb9bd h1:SX16W7pqXGyn6fHFgtlr/rUdLZzBtQ5O3Gt3a6sFL70= github.com/smartcontractkit/chainlink-common v0.4.1-0.20250115094325-4e61572bb9bd/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20250116025035-3ae84ca1456a/go.mod h1:V3BHfvLnQNBUoZ4bGjD29ZPhyzPE++DkYkhvPb9tcRs= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= +github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250116025556-e85fc2ec8b56/go.mod h1:t72AMBAVyfU5qWEAzaD2/U7xmFsZxcAZw6cyCFE7ouo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250113165937-53c02f2513d4 h1:8qPzgbMGGn6CxQe/cjWvBgNKAxOL+brZZV+xYoY5+GE= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250113165937-53c02f2513d4/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= diff --git a/plugins/loop_registry.go b/plugins/loop_registry.go index 82ef219566a..9658be13940 100644 --- a/plugins/loop_registry.go +++ b/plugins/loop_registry.go @@ -28,16 +28,18 @@ type LoopRegistry struct { registry map[string]*RegisteredLoop lggr logger.Logger + cfgDatabase config.Database cfgTracing config.Tracing cfgTelemetry config.Telemetry telemetryAuthHeaders map[string]string telemetryAuthPubKeyHex string } -func NewLoopRegistry(lggr logger.Logger, tracing config.Tracing, telemetry config.Telemetry, telemetryAuthHeaders map[string]string, telemetryAuthPubKeyHex string) *LoopRegistry { +func NewLoopRegistry(lggr logger.Logger, dbConfig config.Database, tracing config.Tracing, telemetry config.Telemetry, telemetryAuthHeaders map[string]string, telemetryAuthPubKeyHex string) *LoopRegistry { return &LoopRegistry{ registry: map[string]*RegisteredLoop{}, lggr: logger.Named(lggr, "LoopRegistry"), + cfgDatabase: dbConfig, cfgTracing: tracing, cfgTelemetry: telemetry, telemetryAuthHeaders: telemetryAuthHeaders, @@ -45,6 +47,13 @@ func NewLoopRegistry(lggr logger.Logger, tracing config.Tracing, telemetry confi } } +func NewTestLoopRegistry(lggr logger.Logger) *LoopRegistry { + return &LoopRegistry{ + registry: map[string]*RegisteredLoop{}, + lggr: logger.Named(lggr, "LoopRegistry"), + } +} + // Register creates a port of the plugin. It is not idempotent. Duplicate calls to Register will return [ErrExists] // Safe for concurrent use. func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) { @@ -63,6 +72,17 @@ func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) { } envCfg := loop.EnvConfig{PrometheusPort: ports[0]} + if m.cfgDatabase != nil { + dbURL := m.cfgDatabase.URL() + envCfg.DatabaseURL = &dbURL + envCfg.DatabaseIdleInTxSessionTimeout = m.cfgDatabase.DefaultIdleInTxSessionTimeout() + envCfg.DatabaseLockTimeout = m.cfgDatabase.DefaultLockTimeout() + envCfg.DatabaseQueryTimeout = m.cfgDatabase.DefaultQueryTimeout() + envCfg.DatabaseLogSQL = m.cfgDatabase.LogSQL() + envCfg.DatabaseMaxOpenConns = m.cfgDatabase.MaxOpenConns() + envCfg.DatabaseMaxIdleConns = m.cfgDatabase.MaxIdleConns() + } + if m.cfgTracing != nil { envCfg.TracingEnabled = m.cfgTracing.Enabled() envCfg.TracingCollectorTarget = m.cfgTracing.CollectorTarget() diff --git a/plugins/loop_registry_test.go b/plugins/loop_registry_test.go index c7484b7aca9..2a9c9cf6dcb 100644 --- a/plugins/loop_registry_test.go +++ b/plugins/loop_registry_test.go @@ -1,18 +1,21 @@ package plugins import ( + "net/url" "testing" "time" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/loop" + + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" ) func TestPluginPortManager(t *testing.T) { // register one - m := NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "") + m := NewTestLoopRegistry(logger.TestLogger(t)) pFoo, err := m.Register("foo") require.NoError(t, err) require.Equal(t, "foo", pFoo.Name) @@ -60,7 +63,36 @@ func (m mockCfgTelemetry) EmitterBatchProcessor() bool { return true } func (m mockCfgTelemetry) EmitterExportTimeout() time.Duration { return 1 * time.Second } +type mockCfgDatabase struct{} + +func (m mockCfgDatabase) Backup() config.Backup { panic("unimplemented") } + +func (m mockCfgDatabase) Listener() config.Listener { panic("unimplemented") } + +func (m mockCfgDatabase) Lock() config.Lock { panic("unimplemented") } + +func (m mockCfgDatabase) DefaultIdleInTxSessionTimeout() time.Duration { return time.Hour } + +func (m mockCfgDatabase) DefaultLockTimeout() time.Duration { return time.Minute } + +func (m mockCfgDatabase) DefaultQueryTimeout() time.Duration { return time.Second } + +func (m mockCfgDatabase) Dialect() string { panic("unimplemented") } + +func (m mockCfgDatabase) LogSQL() bool { return true } + +func (m mockCfgDatabase) MaxIdleConns() int { return 99 } + +func (m mockCfgDatabase) MaxOpenConns() int { return 42 } + +func (m mockCfgDatabase) MigrateDatabase() bool { panic("unimplemented") } + +func (m mockCfgDatabase) URL() url.URL { + return url.URL{Scheme: "fake", Host: "database.url"} +} + func TestLoopRegistry_Register(t *testing.T) { + mockCfgDatabase := &mockCfgDatabase{} mockCfgTracing := &mockCfgTracing{} mockCfgTelemetry := &mockCfgTelemetry{} registry := make(map[string]*RegisteredLoop) @@ -69,6 +101,7 @@ func TestLoopRegistry_Register(t *testing.T) { loopRegistry := &LoopRegistry{ lggr: logger.TestLogger(t), registry: registry, + cfgDatabase: mockCfgDatabase, cfgTracing: mockCfgTracing, cfgTelemetry: mockCfgTelemetry, } @@ -79,6 +112,15 @@ func TestLoopRegistry_Register(t *testing.T) { require.Equal(t, "testID", registeredLoop.Name) envCfg := registeredLoop.EnvCfg + + require.Equal(t, &url.URL{Scheme: "fake", Host: "database.url"}, envCfg.DatabaseURL) + require.Equal(t, time.Hour, envCfg.DatabaseIdleInTxSessionTimeout) + require.Equal(t, time.Minute, envCfg.DatabaseLockTimeout) + require.Equal(t, time.Second, envCfg.DatabaseQueryTimeout) + require.Equal(t, true, envCfg.DatabaseLogSQL) + require.Equal(t, 42, envCfg.DatabaseMaxOpenConns) + require.Equal(t, 99, envCfg.DatabaseMaxIdleConns) + require.True(t, envCfg.TracingEnabled) require.Equal(t, "http://localhost:9000", envCfg.TracingCollectorTarget) require.Equal(t, map[string]string{"attribute": "value"}, envCfg.TracingAttributes)