From 580fcf6fcca3d5f31cccd0b522c9fab981f4634f Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Tue, 21 Jan 2025 11:44:19 +0000 Subject: [PATCH] Refactor --- core/services/workflows/delegate.go | 12 +++--- core/services/workflows/engine.go | 26 +++++------- core/services/workflows/engine_test.go | 13 +++--- core/services/workflows/models.go | 32 +++++++++++++-- core/services/workflows/syncer/handler.go | 40 +++++++++++++------ .../services/workflows/syncer/handler_test.go | 17 ++++---- 6 files changed, 88 insertions(+), 52 deletions(-) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 9e50f5ec092..678f88dcfb6 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -60,11 +60,13 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser } cfg := Config{ - Lggr: d.logger, - Workflow: sdkSpec, - WorkflowID: spec.WorkflowSpec.WorkflowID, - WorkflowOwner: spec.WorkflowSpec.WorkflowOwner, - WorkflowName: spec.WorkflowSpec.WorkflowName, + Lggr: d.logger, + Workflow: sdkSpec, + WorkflowID: spec.WorkflowSpec.WorkflowID, + WorkflowOwner: spec.WorkflowSpec.WorkflowOwner, + WorkflowName: defaultName{ + name: spec.WorkflowSpec.WorkflowName, + }, Registry: d.registry, Store: d.store, Config: config, diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 5a712bcbfae..a526e0d84ab 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -436,11 +436,11 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, WorkflowOwner: e.workflow.owner, - WorkflowName: e.workflow.hexName, + WorkflowName: e.workflow.name.Hex(), WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: t.Ref, - DecodedWorkflowName: e.workflow.name, + DecodedWorkflowName: e.workflow.name.String(), }, Config: t.config.Load(), TriggerID: triggerID, @@ -869,7 +869,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, e.workflow.name, e.workflow.id) + secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name.Hex(), e.workflow.name.String(), e.workflow.id) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } @@ -961,11 +961,11 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe WorkflowID: msg.state.WorkflowID, WorkflowExecutionID: msg.state.ExecutionID, WorkflowOwner: e.workflow.owner, - WorkflowName: e.workflow.hexName, + WorkflowName: e.workflow.name.Hex(), WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: msg.stepRef, - DecodedWorkflowName: e.workflow.name, + DecodedWorkflowName: e.workflow.name.String(), }, } @@ -988,10 +988,10 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr WorkflowID: e.workflow.id, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, - WorkflowName: e.workflow.hexName, + WorkflowName: e.workflow.name.Hex(), WorkflowOwner: e.workflow.owner, ReferenceID: t.Ref, - DecodedWorkflowName: e.workflow.name, + DecodedWorkflowName: e.workflow.name.String(), }, TriggerID: generateTriggerId(e.workflow.id, triggerIdx), Config: t.config.Load(), @@ -1194,8 +1194,7 @@ type Config struct { Workflow sdk.WorkflowSpec WorkflowID string WorkflowOwner string - WorkflowName string // Full human-readable workflow name. Intended for metrics and logging. - WorkflowHexName string // The Workflow Name in an on-chain format, which has requirements of being 10 bytes long, hex encoded (i.e. 20 hex characters long). If not provided, we'll derive it from the workflowName. + WorkflowName WorkflowNamer Lggr logger.Logger Registry core.CapabilitiesRegistry MaxWorkerLimit int @@ -1289,7 +1288,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { return nil, fmt.Errorf("could not initialize monitoring resources: %w", err) } - cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName) + cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()) workflow, err := Parse(cfg.Workflow) if err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr) @@ -1300,15 +1299,10 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { workflow.owner = cfg.WorkflowOwner workflow.name = cfg.WorkflowName - workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName)) - if cfg.WorkflowHexName != "" { - workflow.hexName = cfg.WorkflowHexName - } - engine = &Engine{ cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName), *em}, + metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), *em}, registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 839e3680f72..2e596180416 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -166,12 +166,13 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec reg.SetLocalRegistry(&testConfigProvider{}) cfg := Config{ - WorkflowID: testWorkflowId, - Lggr: logger.TestLogger(t), - Registry: reg, - Workflow: sdkSpec, - maxRetries: 1, - retryMs: 100, + WorkflowID: testWorkflowId, + Lggr: logger.TestLogger(t), + Registry: reg, + Workflow: sdkSpec, + WorkflowName: defaultName{}, + maxRetries: 1, + retryMs: 100, afterInit: func(success bool) { if success { close(initSuccessful) diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index ec149bf6371..80b1d477d0a 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -1,6 +1,7 @@ package workflows import ( + "encoding/hex" "errors" "fmt" "sync/atomic" @@ -14,16 +15,39 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/workflows" ) +type WorkflowNamer interface { + // Must be 10 bytes, hex-encoded (i.e. 20 characters long) + // Used in the metadata we send onchain and for authorizing + // the workflow with the consumer + Hex() string + + // Human-readable version of the name + // This has no restriction on size, and is only + // used for logging and metrics. + String() string +} + +type defaultName struct { + name string +} + +func (d defaultName) Hex() string { + return hex.EncodeToString([]byte(d.name)) +} + +func (d defaultName) String() string { + return d.name +} + // workflow is a directed graph of nodes, where each node is a step. // // triggers are special steps that are stored separately, they're // treated differently due to their nature of being the starting // point of a workflow. type workflow struct { - id string - owner string - hexName string - name string + id string + owner string + name WorkflowNamer graph.Graph[string, *step] triggers []*triggerCapability diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 3955ac95bf6..bf9e5f329f7 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -128,7 +128,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap { } } -type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) +type engineFactoryFn func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. @@ -397,6 +397,23 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { } } +type workflowName struct { + name string +} + +func (w workflowName) String() string { + return w.name +} + +func (w workflowName) Hex() string { + // Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use. + // A name is used internally that is first hashed to avoid collisions, + // hex encoded to ensure UTF8 encoding, then truncated to 10 bytes. + truncatedName := pkgworkflows.HashTruncateName(w.name) + hexName := hex.EncodeToString([]byte(truncatedName)) + return hexName +} + // workflowRegisteredEvent handles the WorkflowRegisteredEvent event type. func (h *eventHandler) workflowRegisteredEvent( ctx context.Context, @@ -467,16 +484,14 @@ func (h *eventHandler) workflowRegisteredEvent( return nil } - truncatedName := pkgworkflows.HashTruncateName(payload.WorkflowName) - hexName := hex.EncodeToString([]byte(truncatedName)) - // If status == active, start a new WorkflowEngine instance, and add it to local engine registry engine, err := h.engineFactory( ctx, wfID, owner, - payload.WorkflowName, - hexName, + workflowName{ + name: payload.WorkflowName, + }, config, decodedBinary, ) @@ -530,7 +545,7 @@ func (h *eventHandler) getWorkflowArtifacts( return decodedBinary, config, nil } -func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { +func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) { moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter} sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) if err != nil { @@ -546,12 +561,11 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner str // Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use. // A name is used internally that is first hashed to avoid collisions, // hex encoded to ensure UTF8 encoding, then truncated to 10 bytes. - WorkflowHexName: hexName, - Registry: h.capRegistry, - Store: h.workflowStore, - Config: config, - Binary: binary, - SecretsFetcher: h, + Registry: h.capRegistry, + Store: h.workflowStore, + Config: config, + Binary: binary, + SecretsFetcher: h, } return workflows.NewEngine(ctx, cfg) } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index e7713db2d0e..3d4bb66616b 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" @@ -233,7 +234,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) { return &mockEngine{}, nil }, GiveConfig: config, @@ -262,13 +263,13 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { - if _, err := hex.DecodeString(hexName); err != nil { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) { + if _, err := hex.DecodeString(name.Hex()); err != nil { return nil, fmt.Errorf("invalid workflow name: %w", err) } - want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(workflowName))) - if want != hexName { - return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", hexName, want) + want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(name.String()))) + if want != name.Hex() { + return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", name.Hex(), want) } return &mockEngine{}, nil }, @@ -298,7 +299,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) { return &mockEngine{StartErr: assert.AnError}, nil }, GiveConfig: config, @@ -461,7 +462,7 @@ type testCase struct { fetcher FetcherFunc Event func([]byte) WorkflowRegistryWorkflowRegisteredV1 validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) - engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) + engineFactoryFn func(ctx context.Context, wfid string, owner string, name workflows.WorkflowNamer, config []byte, binary []byte) (services.Service, error) } func testRunningWorkflow(t *testing.T, tc testCase) {