From 4bf0bf733a46887c095964fb0851db7536a1b341 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Fri, 13 Dec 2024 13:47:39 -0500 Subject: [PATCH] . --- .../relay/evm/mercury/mocks/pipeline.go | 1 + core/services/streams/delegate_test.go | 9 +-- core/services/streams/pipeline.go | 10 ++- core/services/streams/stream_registry.go | 2 - core/services/streams/stream_registry_test.go | 79 ++++++++++++------- 5 files changed, 62 insertions(+), 39 deletions(-) diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index a7183c9a037..1bc14b62c1a 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -41,3 +41,4 @@ func (m *MockTask) TaskTimeout() (time.Duration, bool) { return 0, false } func (m *MockTask) TaskRetries() uint32 { return 0 } func (m *MockTask) TaskMinBackoff() time.Duration { return 0 } func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 } +func (m *MockTask) TaskStreamID() *int32 { return nil } diff --git a/core/services/streams/delegate_test.go b/core/services/streams/delegate_test.go index d177c977e1b..6489a339aa2 100644 --- a/core/services/streams/delegate_test.go +++ b/core/services/streams/delegate_test.go @@ -15,11 +15,11 @@ import ( type mockRegistry struct{} -func (m *mockRegistry) Get(streamID StreamID) (strm Stream, exists bool) { return } -func (m *mockRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error { +func (m *mockRegistry) Get(streamID StreamID) (p Pipeline, exists bool) { return } +func (m *mockRegistry) Register(jb job.Job, rrs ResultRunSaver) error { return nil } -func (m *mockRegistry) Unregister(streamID StreamID) {} +func (m *mockRegistry) Unregister(int32) {} type mockDelegateConfig struct{} @@ -49,8 +49,7 @@ func Test_Delegate(t *testing.T) { strmSrv := srvs[1].(*StreamService) assert.Equal(t, registry, strmSrv.registry) - assert.Equal(t, StreamID(42), strmSrv.id) - assert.Equal(t, jb.PipelineSpec, strmSrv.spec) + assert.Equal(t, jb, strmSrv.jb) assert.NotNil(t, strmSrv.lggr) assert.Equal(t, srvs[0], strmSrv.rrs) }) diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index bead12d96b8..7ea5eb531f3 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -3,7 +3,6 @@ package streams import ( "context" "fmt" - "sync" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" @@ -25,7 +24,6 @@ type Pipeline interface { } type multiStreamPipeline struct { - sync.RWMutex lggr logger.Logger spec pipeline.Spec runner Runner @@ -76,7 +74,13 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R }, }) - return &multiStreamPipeline{sync.RWMutex{}, lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), spec, runner, rrs, streamIDs, vars}, nil + return &multiStreamPipeline{ + lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), + spec, + runner, + rrs, + streamIDs, + vars}, nil } func (s *multiStreamPipeline) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 3fadd0bac12..6c76fe101e9 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -10,8 +10,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" ) -// TODO: Rename, this is actually a PipelineRegistry (? is it ?) - // alias for easier refactoring type StreamID = llo.StreamID diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 738b68f5d4d..fa9486728f8 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -5,22 +5,29 @@ import ( "testing" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type mockStream struct { +var _ Pipeline = &mockPipeline{} + +type mockPipeline struct { run *pipeline.Run trrs pipeline.TaskRunResults err error } -func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { +func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { return m.run, m.trrs, m.err } +func (m *mockPipeline) StreamIDs() []StreamID { + return nil +} + func Test_Registry(t *testing.T) { lggr := logger.TestLogger(t) runner := &mockRunner{} @@ -28,21 +35,21 @@ func Test_Registry(t *testing.T) { t.Run("Get", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}} - sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}} - sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}} + sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}} + sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}} + sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}} v, exists := sr.Get(1) assert.True(t, exists) - assert.Equal(t, sr.streams[1], v) + assert.Equal(t, sr.pipelines[1], v) v, exists = sr.Get(2) assert.True(t, exists) - assert.Equal(t, sr.streams[2], v) + assert.Equal(t, sr.pipelines[2], v) v, exists = sr.Get(3) assert.True(t, exists) - assert.Equal(t, sr.streams[3], v) + assert.Equal(t, sr.pipelines[3], v) v, exists = sr.Get(4) assert.Nil(t, v) @@ -51,56 +58,70 @@ func Test_Registry(t *testing.T) { t.Run("Register", func(t *testing.T) { sr := newRegistry(lggr, runner) - t.Run("registers new stream", func(t *testing.T) { - assert.Len(t, sr.streams, 0) - err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + t.Run("registers new pipeline with multiple stream IDs", func(t *testing.T) { + assert.Len(t, sr.pipelines, 0) + // err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil) + // TODO: what if the dag is unparseable? + // err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: ` +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=2 index=1]; +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`}}, nil) require.NoError(t, err) - assert.Len(t, sr.streams, 1) + assert.Len(t, sr.pipelines, 1) v, exists := sr.Get(1) require.True(t, exists) - strm := v.(*stream) - assert.Equal(t, StreamID(1), strm.id) - assert.Equal(t, int32(32), strm.spec.ID) + msp := v.(*multiStreamPipeline) + assert.Equal(t, "foo", msp.StreamIDs()) + assert.Equal(t, int32(32), msp.spec.ID) }) t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) { - assert.Len(t, sr.streams, 1) + assert.Len(t, sr.pipelines, 1) err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil) require.Error(t, err) - assert.Len(t, sr.streams, 1) + assert.Len(t, sr.pipelines, 1) assert.EqualError(t, err, "stream already registered for id: 1") v, exists := sr.Get(1) require.True(t, exists) - strm := v.(*stream) - assert.Equal(t, StreamID(1), strm.id) - assert.Equal(t, int32(32), strm.spec.ID) + msp := v.(*multiStreamPipeline) + assert.Equal(t, StreamID(1), msp.id) + assert.Equal(t, int32(32), msp.spec.ID) }) }) t.Run("Unregister", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}} - sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}} - sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}} + sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}} + sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}} + sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}} t.Run("unregisters a stream", func(t *testing.T) { - assert.Len(t, sr.streams, 3) + assert.Len(t, sr.pipelines, 3) sr.Unregister(1) - assert.Len(t, sr.streams, 2) - _, exists := sr.streams[1] + assert.Len(t, sr.pipelines, 2) + _, exists := sr.pipelines[1] assert.False(t, exists) }) t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) { - assert.Len(t, sr.streams, 2) + assert.Len(t, sr.pipelines, 2) sr.Unregister(1) - assert.Len(t, sr.streams, 2) - _, exists := sr.streams[1] + assert.Len(t, sr.pipelines, 2) + _, exists := sr.pipelines[1] assert.False(t, exists) }) })