Skip to content

Commit

Permalink
Merge branch 'develop' into use-rmn-changeset-for-e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
carte7000 committed Dec 12, 2024
2 parents 8c13745 + 771151b commit 8cc2a3f
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 17 deletions.
1 change: 1 addition & 0 deletions core/services/ocr3/promwrapper/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, conf
config.ConfigDigest.String(),
promOCR3ReportsGenerated,
promOCR3Durations,
promOCR3Sizes,
promOCR3PluginStatus,
)
return wrapped, info, err
Expand Down
20 changes: 18 additions & 2 deletions core/services/ocr3/promwrapper/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type reportingPlugin[RI any] struct {
// Prometheus components for tracking metrics
reportsGenerated *prometheus.CounterVec
durations *prometheus.HistogramVec
sizes *prometheus.CounterVec
status *prometheus.GaugeVec
}

Expand All @@ -31,6 +32,7 @@ func newReportingPlugin[RI any](
configDigest string,
reportsGenerated *prometheus.CounterVec,
durations *prometheus.HistogramVec,
sizes *prometheus.CounterVec,
status *prometheus.GaugeVec,
) *reportingPlugin[RI] {
return &reportingPlugin[RI]{
Expand All @@ -40,6 +42,7 @@ func newReportingPlugin[RI any](
configDigest: configDigest,
reportsGenerated: reportsGenerated,
durations: durations,
sizes: sizes,
status: status,
}
}
Expand All @@ -51,9 +54,11 @@ func (p *reportingPlugin[RI]) Query(ctx context.Context, outctx ocr3types.Outcom
}

func (p *reportingPlugin[RI]) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query) (ocrtypes.Observation, error) {
return withObservedExecution(p, observation, func() (ocrtypes.Observation, error) {
result, err := withObservedExecution(p, observation, func() (ocrtypes.Observation, error) {
return p.ReportingPlugin.Observation(ctx, outctx, query)
})
p.trackSize(observation, len(result), err)
return result, err
}

func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, ao ocrtypes.AttributedObservation) error {
Expand All @@ -65,9 +70,11 @@ func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx oc
}

func (p *reportingPlugin[RI]) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, aos []ocrtypes.AttributedObservation) (ocr3types.Outcome, error) {
return withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) {
result, err := withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) {
return p.ReportingPlugin.Outcome(ctx, outctx, query, aos)
})
p.trackSize(outcome, len(result), err)
return result, err
}

func (p *reportingPlugin[RI]) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) {
Expand Down Expand Up @@ -111,6 +118,15 @@ func (p *reportingPlugin[RI]) updateStatus(status bool) {
Set(float64(boolToInt(status)))
}

func (p *reportingPlugin[RI]) trackSize(function functionType, size int, err error) {
if err != nil {
return
}
p.sizes.
WithLabelValues(p.chainID, p.plugin, string(function)).
Add(float64(size))
}

func boolToInt(arg bool) int {
if arg {
return 1
Expand Down
40 changes: 29 additions & 11 deletions core/services/ocr3/promwrapper/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ import (
)

func Test_ReportsGeneratedGauge(t *testing.T) {
pluginObservationSize := 5
pluginOutcomeSize := 3

plugin1 := newReportingPlugin(
fakePlugin[uint]{reports: make([]ocr3types.ReportPlus[uint], 2)},
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin2 := newReportingPlugin(
fakePlugin[bool]{reports: make([]ocr3types.ReportPlus[bool], 10)},
"solana", "different_plugin", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
fakePlugin[bool]{reports: make([]ocr3types.ReportPlus[bool], 10), observationSize: pluginObservationSize, outcomeSize: pluginOutcomeSize},
"solana", "different_plugin", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin3 := newReportingPlugin(
fakePlugin[string]{err: errors.New("error")},
"1234", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"1234", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)

r1, err := plugin1.Reports(tests.Context(t), 1, nil)
Expand Down Expand Up @@ -64,20 +67,33 @@ func Test_ReportsGeneratedGauge(t *testing.T) {
require.NoError(t, plugin1.Close())
pluginHealth = testutil.ToFloat64(promOCR3PluginStatus.WithLabelValues("123", "empty", "abc"))
require.Equal(t, 0, int(pluginHealth))

iterations := 10
for i := 0; i < iterations; i++ {
_, err1 := plugin2.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, nil, nil)
require.NoError(t, err1)
}
_, err1 := plugin2.Observation(tests.Context(t), ocr3types.OutcomeContext{}, nil)
require.NoError(t, err1)

outcomesLen := testutil.ToFloat64(promOCR3Sizes.WithLabelValues("solana", "different_plugin", "outcome"))
require.Equal(t, pluginOutcomeSize*iterations, int(outcomesLen))
observationLen := testutil.ToFloat64(promOCR3Sizes.WithLabelValues("solana", "different_plugin", "observation"))
require.Equal(t, pluginObservationSize, int(observationLen))
}

func Test_DurationHistograms(t *testing.T) {
plugin1 := newReportingPlugin(
fakePlugin[uint]{},
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin2 := newReportingPlugin(
fakePlugin[uint]{err: errors.New("error")},
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin3 := newReportingPlugin(
fakePlugin[uint]{},
"solana", "commit", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"solana", "commit", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)

for _, p := range []*reportingPlugin[uint]{plugin1, plugin2, plugin3} {
Expand All @@ -102,8 +118,10 @@ func Test_DurationHistograms(t *testing.T) {
}

type fakePlugin[RI any] struct {
reports []ocr3types.ReportPlus[RI]
err error
reports []ocr3types.ReportPlus[RI]
observationSize int
outcomeSize int
err error
}

func (f fakePlugin[RI]) Query(context.Context, ocr3types.OutcomeContext) (ocrtypes.Query, error) {
Expand All @@ -117,7 +135,7 @@ func (f fakePlugin[RI]) Observation(context.Context, ocr3types.OutcomeContext, o
if f.err != nil {
return nil, f.err
}
return ocrtypes.Observation{}, nil
return make([]byte, f.observationSize), nil
}

func (f fakePlugin[RI]) ValidateObservation(context.Context, ocr3types.OutcomeContext, ocrtypes.Query, ocrtypes.AttributedObservation) error {
Expand All @@ -132,7 +150,7 @@ func (f fakePlugin[RI]) Outcome(context.Context, ocr3types.OutcomeContext, ocrty
if f.err != nil {
return nil, f.err
}
return ocr3types.Outcome{}, nil
return make([]byte, f.outcomeSize), nil
}

func (f fakePlugin[RI]) Reports(context.Context, uint64, ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) {
Expand Down
7 changes: 7 additions & 0 deletions core/services/ocr3/promwrapper/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ var (
},
[]string{"chainID", "plugin", "function", "success"},
)
promOCR3Sizes = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ocr3_reporting_plugin_data_sizes",
Help: "Tracks the size of the data produced by OCR3 plugin in bytes (e.g. reports, observations etc.)",
},
[]string{"chainID", "plugin", "function"},
)
promOCR3PluginStatus = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "ocr3_reporting_plugin_status",
Expand Down
7 changes: 5 additions & 2 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,13 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
secrets_id = CASE
WHEN workflow_specs.secrets_id IS NULL THEN EXCLUDED.secrets_id
ELSE workflow_specs.secrets_id
END
RETURNING id
`

Expand Down
83 changes: 83 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,86 @@ func Test_GetContentsByWorkflowID_SecretsProvidedButEmpty(t *testing.T) {
_, _, err = orm.GetContentsByWorkflowID(ctx, workflowID)
require.ErrorIs(t, err, ErrEmptySecrets)
}

func Test_UpsertWorkflowSpecWithSecrets(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

t.Run("inserts new spec and new secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

// Verify the record exists in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)

// Verify the secrets exists in the database
contents, err := orm.GetContents(ctx, giveURL)
require.NoError(t, err)
require.Equal(t, giveContent, contents)
})

t.Run("updates existing spec and secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

// Update the status
spec.Status = job.WorkflowSpecStatusPaused

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, "new contents")
require.NoError(t, err)

// Verify the record is updated in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Config, dbSpec.Config)

// Verify the secrets is updated in the database
contents, err := orm.GetContents(ctx, giveURL)
require.NoError(t, err)
require.Equal(t, "new contents", contents)
})
}
6 changes: 6 additions & 0 deletions deployment/ccip/changeset/solana_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package changeset

// SolChainState holds a Go binding for all the currently deployed CCIP programs
// on a chain. If a binding is nil, it means here is no such contract on the chain.
type SolCCIPChainState struct {
}
3 changes: 2 additions & 1 deletion deployment/ccip/changeset/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ type CCIPOnChainState struct {
// Populated go bindings for the appropriate version for all contracts.
// We would hold 2 versions of each contract here. Once we upgrade we can phase out the old one.
// When generating bindings, make sure the package name corresponds to the version.
Chains map[uint64]CCIPChainState
Chains map[uint64]CCIPChainState
SolChains map[uint64]SolCCIPChainState
}

func (s CCIPOnChainState) View(chains []uint64) (map[string]view.ChainView, error) {
Expand Down
2 changes: 1 addition & 1 deletion deployment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Environment struct {
Logger logger.Logger
ExistingAddresses AddressBook
Chains map[uint64]Chain
SolChains map[uint64]SolChain
NodeIDs []string
Offchain OffchainClient
GetContext func() context.Context
Expand Down Expand Up @@ -331,7 +332,6 @@ func NodeInfo(nodeIDs []string, oc NodeChainConfigsLister) (Nodes, error) {
Enabled: 1,
Ids: nodeIDs,
}

}
nodesFromJD, err := oc.ListNodes(context.Background(), &nodev1.ListNodesRequest{
Filter: filter,
Expand Down
5 changes: 5 additions & 0 deletions deployment/solana_chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package deployment

// SolChain represents a Solana chain.
type SolChain struct {
}
8 changes: 8 additions & 0 deletions integration-tests/smoke/ccip/ccip_rmn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
)

func TestRMN_TwoMessagesOnTwoLanesIncludingBatching(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "messages on two lanes including batching",
waitForExec: true,
Expand All @@ -58,6 +59,7 @@ func TestRMN_TwoMessagesOnTwoLanesIncludingBatching(t *testing.T) {
}

func TestRMN_MultipleMessagesOnOneLaneNoWaitForExec(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "multiple messages for rmn batching inspection and one rmn node down",
waitForExec: false, // do not wait for execution reports
Expand All @@ -80,6 +82,7 @@ func TestRMN_MultipleMessagesOnOneLaneNoWaitForExec(t *testing.T) {
}

func TestRMN_NotEnoughObservers(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "one message but not enough observers, should not get a commit report",
passIfNoCommitAfter: 15 * time.Second,
Expand All @@ -102,6 +105,7 @@ func TestRMN_NotEnoughObservers(t *testing.T) {
}

func TestRMN_DifferentSigners(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "different signers and different observers",
homeChainConfig: homeChainConfig{
Expand All @@ -126,6 +130,7 @@ func TestRMN_DifferentSigners(t *testing.T) {
}

func TestRMN_NotEnoughSigners(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "different signers and different observers",
passIfNoCommitAfter: 15 * time.Second,
Expand All @@ -151,6 +156,7 @@ func TestRMN_NotEnoughSigners(t *testing.T) {
}

func TestRMN_DifferentRmnNodesForDifferentChains(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "different rmn nodes support different chains",
waitForExec: false,
Expand All @@ -177,6 +183,7 @@ func TestRMN_DifferentRmnNodesForDifferentChains(t *testing.T) {
}

func TestRMN_TwoMessagesOneSourceChainCursed(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "two messages, one source chain is cursed",
passIfNoCommitAfter: 15 * time.Second,
Expand All @@ -203,6 +210,7 @@ func TestRMN_TwoMessagesOneSourceChainCursed(t *testing.T) {
}

func TestRMN_GlobalCurseTwoMessagesOnTwoLanes(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")
runRmnTestCase(t, rmnTestCase{
name: "global curse messages on two lanes",
waitForExec: false,
Expand Down

0 comments on commit 8cc2a3f

Please sign in to comment.