Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Jan 21, 2025
1 parent c50dfcd commit e353a58
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

Expand Down Expand Up @@ -420,7 +420,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) {
require.NoError(t, err)

from := [20]byte(backendTH.ContractsOwner.From)
id, err := workflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "")
id, err := pkgworkflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "")

Check failure on line 423 in core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

undefined: pkgworkflows

Check failure on line 423 in core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

undefined: pkgworkflows

Check failure on line 423 in core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: pkgworkflows
require.NoError(t, err)
giveWorkflow.ID = id

Expand Down Expand Up @@ -481,7 +481,7 @@ func (m *mockService) Name() string { return "svc" }

type mockEngineFactory struct{}

func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) {
return &mockService{}, nil
}

Expand Down Expand Up @@ -517,7 +517,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {
require.NoError(t, err)

from := [20]byte(backendTH.ContractsOwner.From)
id, err := workflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "")
id, err := pkgworkflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "")

Check failure on line 520 in core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

undefined: pkgworkflows

Check failure on line 520 in core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

undefined: pkgworkflows (typecheck)

Check failure on line 520 in core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: pkgworkflows
require.NoError(t, err)
giveWorkflow.ID = id

Expand Down
12 changes: 7 additions & 5 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
}

cfg := Config{
Lggr: d.logger,
Workflow: sdkSpec,
WorkflowID: spec.WorkflowSpec.WorkflowID,
WorkflowOwner: spec.WorkflowSpec.WorkflowOwner,
WorkflowName: spec.WorkflowSpec.WorkflowName,
Lggr: d.logger,
Workflow: sdkSpec,
WorkflowID: spec.WorkflowSpec.WorkflowID,
WorkflowOwner: spec.WorkflowSpec.WorkflowOwner,
WorkflowName: defaultName{
name: spec.WorkflowSpec.WorkflowName,
},
Registry: d.registry,
Store: d.store,
Config: config,
Expand Down
26 changes: 10 additions & 16 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,11 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.hexName,
WorkflowName: e.workflow.name.Hex(),
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: t.Ref,
DecodedWorkflowName: e.workflow.name,
DecodedWorkflowName: e.workflow.name.String(),
},
Config: t.config.Load(),
TriggerID: triggerID,
Expand Down Expand Up @@ -869,7 +869,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, e.workflow.name, e.workflow.id)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name.Hex(), e.workflow.name.String(), e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down Expand Up @@ -961,11 +961,11 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
WorkflowID: msg.state.WorkflowID,
WorkflowExecutionID: msg.state.ExecutionID,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.hexName,
WorkflowName: e.workflow.name.Hex(),
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: msg.stepRef,
DecodedWorkflowName: e.workflow.name,
DecodedWorkflowName: e.workflow.name.String(),
},
}

Expand All @@ -988,10 +988,10 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr
WorkflowID: e.workflow.id,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
WorkflowName: e.workflow.hexName,
WorkflowName: e.workflow.name.Hex(),
WorkflowOwner: e.workflow.owner,
ReferenceID: t.Ref,
DecodedWorkflowName: e.workflow.name,
DecodedWorkflowName: e.workflow.name.String(),
},
TriggerID: generateTriggerId(e.workflow.id, triggerIdx),
Config: t.config.Load(),
Expand Down Expand Up @@ -1194,8 +1194,7 @@ type Config struct {
Workflow sdk.WorkflowSpec
WorkflowID string
WorkflowOwner string
WorkflowName string // Full human-readable workflow name. Intended for metrics and logging.
WorkflowHexName string // The Workflow Name in an on-chain format, which has requirements of being 10 bytes long, hex encoded (i.e. 20 hex characters long). If not provided, we'll derive it from the workflowName.
WorkflowName WorkflowNamer
Lggr logger.Logger
Registry core.CapabilitiesRegistry
MaxWorkerLimit int
Expand Down Expand Up @@ -1289,7 +1288,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err)
}

cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String())
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand All @@ -1300,15 +1299,10 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
workflow.owner = cfg.WorkflowOwner
workflow.name = cfg.WorkflowName

workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))
if cfg.WorkflowHexName != "" {
workflow.hexName = cfg.WorkflowHexName
}

engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName), *em},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), *em},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down
13 changes: 7 additions & 6 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec

reg.SetLocalRegistry(&testConfigProvider{})
cfg := Config{
WorkflowID: testWorkflowId,
Lggr: logger.TestLogger(t),
Registry: reg,
Workflow: sdkSpec,
maxRetries: 1,
retryMs: 100,
WorkflowID: testWorkflowId,
Lggr: logger.TestLogger(t),
Registry: reg,
Workflow: sdkSpec,
WorkflowName: defaultName{},
maxRetries: 1,
retryMs: 100,
afterInit: func(success bool) {
if success {
close(initSuccessful)
Expand Down
32 changes: 28 additions & 4 deletions core/services/workflows/models.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package workflows

import (
"encoding/hex"
"errors"
"fmt"
"sync/atomic"
Expand All @@ -14,16 +15,39 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
)

type WorkflowNamer interface {
// Must be 10 bytes, hex-encoded (i.e. 20 characters long)
// Used in the metadata we send onchain and for authorizing
// the workflow with the consumer
Hex() string

// Human-readable version of the name
// This has no restriction on size, and is only
// used for logging and metrics.
String() string
}

type defaultName struct {
name string
}

func (d defaultName) Hex() string {
return hex.EncodeToString([]byte(d.name))
}

func (d defaultName) String() string {
return d.name
}

// workflow is a directed graph of nodes, where each node is a step.
//
// triggers are special steps that are stored separately, they're
// treated differently due to their nature of being the starting
// point of a workflow.
type workflow struct {
id string
owner string
hexName string
name string
id string
owner string
name WorkflowNamer
graph.Graph[string, *step]

triggers []*triggerCapability
Expand Down
40 changes: 27 additions & 13 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap {
}
}

type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error)
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
Expand Down Expand Up @@ -397,6 +397,23 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
}
}

type workflowName struct {
name string
}

func (w workflowName) String() string {
return w.name
}

func (w workflowName) Hex() string {
// Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use.
// A name is used internally that is first hashed to avoid collisions,
// hex encoded to ensure UTF8 encoding, then truncated to 10 bytes.
truncatedName := pkgworkflows.HashTruncateName(w.name)
hexName := hex.EncodeToString([]byte(truncatedName))
return hexName
}

// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type.
func (h *eventHandler) workflowRegisteredEvent(
ctx context.Context,
Expand Down Expand Up @@ -467,16 +484,14 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

truncatedName := pkgworkflows.HashTruncateName(payload.WorkflowName)
hexName := hex.EncodeToString([]byte(truncatedName))

// If status == active, start a new WorkflowEngine instance, and add it to local engine registry
engine, err := h.engineFactory(
ctx,
wfID,
owner,
payload.WorkflowName,
hexName,
workflowName{
name: payload.WorkflowName,
},
config,
decodedBinary,
)
Expand Down Expand Up @@ -530,7 +545,7 @@ func (h *eventHandler) getWorkflowArtifacts(
return decodedBinary, config, nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) {
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
Expand All @@ -546,12 +561,11 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner str
// Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use.
// A name is used internally that is first hashed to avoid collisions,
// hex encoded to ensure UTF8 encoding, then truncated to 10 bytes.
WorkflowHexName: hexName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h,
}
return workflows.NewEngine(ctx, cfg)
}
Expand Down
17 changes: 9 additions & 8 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
Expand Down Expand Up @@ -233,7 +234,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{}, nil
},
GiveConfig: config,
Expand Down Expand Up @@ -262,13 +263,13 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
if _, err := hex.DecodeString(hexName); err != nil {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) {
if _, err := hex.DecodeString(name.Hex()); err != nil {
return nil, fmt.Errorf("invalid workflow name: %w", err)
}
want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(workflowName)))
if want != hexName {
return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", hexName, want)
want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(name.String())))
if want != name.Hex() {
return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", name.Hex(), want)
}
return &mockEngine{}, nil
},
Expand Down Expand Up @@ -298,7 +299,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{StartErr: assert.AnError}, nil
},
GiveConfig: config,
Expand Down Expand Up @@ -461,7 +462,7 @@ type testCase struct {
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error)
}

func testRunningWorkflow(t *testing.T, tc testCase) {
Expand Down

0 comments on commit e353a58

Please sign in to comment.