Skip to content

Commit

Permalink
[KS-430] Provide an OracleFactory to StandardCapabilities (#14305)
Browse files Browse the repository at this point in the history
* --wip-- [skip CI]

* --wip-- [skip CI]

* Add bootstrap peers via capabilities spec

* Add peer wrapper to Oracle Factory

* Add comment explaining delegate position

* Cleanup

* Create a key bundle if none exists

* Use PR #738 hash

* Fix bad merge

* Bump chainlink-common

* Use in-memory DB for OCR persistance

* Also add eth pubkey

* Oracle instance spawned

* Bump chainlink-common version

* Undo some changes

* Add changeset + fix interface

* Update changeset

* Use chainlink-common commit from main branch

* Add oracle_factory config to SC spec

* Undo a change

* Things work as is. Checkpoint.

* Update comments. Remove redundant log.

* Remove redundant test and bring back log line

* Remove redundant JSONConfig

* Implement oracle factory transmitter

* Stop using pkg/errors

* Naming convention

* Woops

* Fix lint errors

* Tidy
  • Loading branch information
DeividasK authored Oct 9, 2024
1 parent dbd42db commit 5ca0d1f
Show file tree
Hide file tree
Showing 21 changed files with 621 additions and 39 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilled-months-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added oracle support in standard capabilities
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q=
Expand Down
23 changes: 14 additions & 9 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
pipelineRunner,
cfg.JobPipeline(),
),
job.StandardCapabilities: standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators,
gatewayConnectorWrapper),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down Expand Up @@ -501,6 +492,20 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("P2P stack required for OCR or OCR2")
}

// If peer wrapper is initialized, Oracle Factory dependency will be available to standard capabilities
delegates[job.StandardCapabilities] = standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators,
gatewayConnectorWrapper,
keyStore,
peerWrapper,
)

if cfg.OCR().Enabled() {
delegates[job.OffchainReporting] = ocr.NewDelegate(
opts.DS,
Expand Down
33 changes: 28 additions & 5 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,12 +948,35 @@ func (w *WorkflowSpec) RawSpec(ctx context.Context) ([]byte, error) {
return rs, nil
}

type OracleFactoryConfig struct {
Enabled bool `toml:"enabled"`
BootstrapPeers []string `toml:"bootstrap_peers"` // e.g.,["12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690"]
OCRContractAddress string `toml:"ocr_contract_address"` // e.g., 0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6
ChainID string `toml:"chain_id"` // e.g., "31337"
Network string `toml:"network"` // e.g., "evm"
}

// Value returns this instance serialized for database storage.
func (ofc OracleFactoryConfig) Value() (driver.Value, error) {
return json.Marshal(ofc)
}

// Scan reads the database value and returns an instance.
func (ofc *OracleFactoryConfig) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.Errorf("expected bytes got %T", b)
}
return json.Unmarshal(b, &ofc)
}

type StandardCapabilitiesSpec struct {
ID int32
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
Command string `toml:"command"`
Config string `toml:"config"`
ID int32
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
Command string `toml:"command" db:"command"`
Config string `toml:"config" db:"config"`
OracleFactory OracleFactoryConfig `toml:"oracle_factory" db:"oracle_factory"`
}

func (w *StandardCapabilitiesSpec) GetID() string {
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
}
jb.WorkflowSpecID = &specID
case StandardCapabilities:
sql := `INSERT INTO standardcapabilities_specs (command, config, created_at, updated_at)
VALUES (:command, :config, NOW(), NOW())
sql := `INSERT INTO standardcapabilities_specs (command, config, oracle_factory, created_at, updated_at)
VALUES (:command, :config, :oracle_factory, NOW(), NOW())
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.StandardCapabilitiesSpec)
if err != nil {
Expand Down
140 changes: 140 additions & 0 deletions core/services/ocr2/plugins/generic/oraclefactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package generic

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/prometheus/client_golang/prometheus"
ocr "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

type oracleFactory struct {
database ocr3types.Database
jobID int32
jobName string
jobORM job.ORM
kb ocr2key.KeyBundle
lggr logger.Logger
config job.OracleFactoryConfig
peerWrapper *ocrcommon.SingletonPeerWrapper
relayerSet *RelayerSet
transmitterID string
}

type OracleFactoryParams struct {
JobID int32
JobName string
JobORM job.ORM
KB ocr2key.KeyBundle
Logger logger.Logger
Config job.OracleFactoryConfig
PeerWrapper *ocrcommon.SingletonPeerWrapper
RelayerSet *RelayerSet
TransmitterID string
}

func NewOracleFactory(params OracleFactoryParams) (core.OracleFactory, error) {
return &oracleFactory{
database: OracleFactoryDB(params.JobID, params.Logger),
jobID: params.JobID,
jobName: params.JobName,
jobORM: params.JobORM,
kb: params.KB,
lggr: params.Logger,
config: params.Config,
peerWrapper: params.PeerWrapper,
relayerSet: params.RelayerSet,
transmitterID: params.TransmitterID,
}, nil
}

func (of *oracleFactory) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) {
if !of.peerWrapper.IsStarted() {
return nil, errors.New("peer wrapper not started")
}

relayer, err := of.relayerSet.Get(ctx, types.RelayID{Network: of.config.Network, ChainID: of.config.ChainID})
if err != nil {
return nil, fmt.Errorf("error when getting relayer: %w", err)
}

var relayConfig = struct {
ChainID string `json:"chainID"`
EffectiveTransmitterID string `json:"effectiveTransmitterID"`
SendingKeys []string `json:"sendingKeys"`
}{
ChainID: of.config.ChainID,
EffectiveTransmitterID: of.transmitterID,
SendingKeys: []string{of.transmitterID},
}
relayConfigBytes, err := json.Marshal(relayConfig)
if err != nil {
return nil, fmt.Errorf("error when marshalling relay config: %w", err)
}

pluginProvider, err := relayer.NewPluginProvider(ctx, core.RelayArgs{
ContractID: of.config.OCRContractAddress,
ProviderType: "plugin",
RelayConfig: relayConfigBytes,
}, core.PluginArgs{
TransmitterID: of.transmitterID,
})
if err != nil {
return nil, fmt.Errorf("error when getting offchain digester: %w", err)
}

bootstrapPeers, err := ocrcommon.ParseBootstrapPeers(of.config.BootstrapPeers)
if err != nil {
return nil, fmt.Errorf("failed to parse bootstrap peers: %w", err)
}

oracle, err := ocr.NewOracle(ocr.OCR3OracleArgs[[]byte]{
// We are relying on the relayer plugin provider for the offchain config digester
// and the contract config tracker to save time.
ContractConfigTracker: pluginProvider.ContractConfigTracker(),
OffchainConfigDigester: pluginProvider.OffchainConfigDigester(),
LocalConfig: args.LocalConfig,
ContractTransmitter: NewContractTransmitter(of.transmitterID, args.ContractTransmitter),
ReportingPluginFactory: args.ReportingPluginFactoryService,
BinaryNetworkEndpointFactory: of.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Database: of.database,
Logger: ocrcommon.NewOCRWrapper(of.lggr, true, func(ctx context.Context, msg string) {
logger.Sugared(of.lggr).ErrorIf(of.jobORM.RecordError(ctx, of.jobID, msg), "unable to record error")
}),
MonitoringEndpoint: &telemetry.NoopAgent{},
OffchainKeyring: of.kb,
OnchainKeyring: ocrcommon.NewOCR3OnchainKeyringAdapter(of.kb),
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": of.jobName}, prometheus.DefaultRegisterer),
})

if err != nil {
return nil, fmt.Errorf("%w: failed to create new OCR oracle", err)
}

return &adaptedOracle{oracle: oracle}, nil
}

type adaptedOracle struct {
oracle ocr.Oracle
}

func (a *adaptedOracle) Start(ctx context.Context) error {
return a.oracle.Start()
}

func (a *adaptedOracle) Close(ctx context.Context) error {
return a.oracle.Close()
}
135 changes: 135 additions & 0 deletions core/services/ocr2/plugins/generic/oraclefactorydb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package generic

import (
"context"
"encoding/json"
"fmt"
"time"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type oracleFactoryDb struct {
// The ID is used for logging and error messages
// A single standard capabilities spec can instantiate multiple oracles
// TODO: NewOracle should take a unique identifier for the oracle
specID int32
lggr logger.SugaredLogger
config *ocrtypes.ContractConfig
states map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState
pendingTransmissions map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission
protocolStates map[ocrtypes.ConfigDigest]map[string][]byte
}

var (
_ ocrtypes.Database = &oracleFactoryDb{}
)

// NewDB returns a new DB scoped to this instanceID
func OracleFactoryDB(specID int32, lggr logger.Logger) *oracleFactoryDb {
return &oracleFactoryDb{
specID: specID,
lggr: logger.Sugared(lggr.Named("OracleFactoryMemoryDb")),
states: make(map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState),
pendingTransmissions: make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission),
protocolStates: make(map[ocrtypes.ConfigDigest]map[string][]byte),
}
}

func (ofdb *oracleFactoryDb) ReadState(ctx context.Context, cd ocrtypes.ConfigDigest) (ps *ocrtypes.PersistentState, err error) {
ps, ok := ofdb.states[cd]
if !ok {
return nil, fmt.Errorf("state not found for standard capabilities spec ID %d, config digest %s", ofdb.specID, cd)
}

return ps, nil
}

func (ofdb *oracleFactoryDb) WriteState(ctx context.Context, cd ocrtypes.ConfigDigest, state ocrtypes.PersistentState) error {
ofdb.states[cd] = &state
return nil
}

func (ofdb *oracleFactoryDb) ReadConfig(ctx context.Context) (c *ocrtypes.ContractConfig, err error) {
if ofdb.config == nil {
// Returning nil, nil because this is a cache miss
return nil, nil
}
return ofdb.config, nil
}

func (ofdb *oracleFactoryDb) WriteConfig(ctx context.Context, c ocrtypes.ContractConfig) error {
ofdb.config = &c

cBytes, err := json.Marshal(c)
if err != nil {
return fmt.Errorf("MemoryDB: WriteConfig failed to marshal config: %w", err)
}

ofdb.lggr.Debugw("MemoryDB: WriteConfig", "ocrtypes.ContractConfig", string(cBytes))

return nil
}

func (ofdb *oracleFactoryDb) StorePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp, tx ocrtypes.PendingTransmission) error {
ofdb.pendingTransmissions[t] = tx
return nil
}

func (ofdb *oracleFactoryDb) PendingTransmissionsWithConfigDigest(ctx context.Context, cd ocrtypes.ConfigDigest) (map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission, error) {
m := make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission)
for k, v := range ofdb.pendingTransmissions {
if k.ConfigDigest == cd {
m[k] = v
}
}

return m, nil
}

func (ofdb *oracleFactoryDb) DeletePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp) error {
delete(ofdb.pendingTransmissions, t)
return nil
}

func (ofdb *oracleFactoryDb) DeletePendingTransmissionsOlderThan(ctx context.Context, t time.Time) error {
for k, v := range ofdb.pendingTransmissions {
if v.Time.Before(t) {
delete(ofdb.pendingTransmissions, k)
}
}

return nil
}

func (ofdb *oracleFactoryDb) ReadProtocolState(
ctx context.Context,
configDigest ocrtypes.ConfigDigest,
key string,
) ([]byte, error) {
value, ok := ofdb.protocolStates[configDigest][key]
if !ok {
// Previously implementation returned nil if the state is not found
return nil, nil
}
return value, nil
}

func (ofdb *oracleFactoryDb) WriteProtocolState(
ctx context.Context,
configDigest ocrtypes.ConfigDigest,
key string,
value []byte,
) error {
if value == nil {
delete(ofdb.protocolStates[configDigest], key)
} else {
if ofdb.protocolStates[configDigest] == nil {
ofdb.protocolStates[configDigest] = make(map[string][]byte)
}
ofdb.protocolStates[configDigest][key] = value
}
return nil
}
Loading

0 comments on commit 5ca0d1f

Please sign in to comment.