diff --git a/.mockery.yaml b/.mockery.yaml index 8b13bd649..3d783094c 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -41,9 +41,11 @@ packages: github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller: interfaces: RPCClient: + WorkerGroup: ORM: config: inpackage: True dir: "pkg/solana/logpoller" filename: mock_orm.go mockname: mockORM + diff --git a/pkg/solana/logpoller/job_get_slots_for_addr.go b/pkg/solana/logpoller/job_get_slots_for_addr.go index 754501395..da28d1581 100644 --- a/pkg/solana/logpoller/job_get_slots_for_addr.go +++ b/pkg/solana/logpoller/job_get_slots_for_addr.go @@ -21,10 +21,10 @@ type getSlotsForAddressJob struct { storeSlot func(slot uint64) done chan struct{} - workers *worker.Group + workers WorkerGroup } -func newGetSlotsForAddress(client RPCClient, workers *worker.Group, storeSlot func(uint64), address PublicKey, from, to uint64) *getSlotsForAddressJob { +func newGetSlotsForAddress(client RPCClient, workers WorkerGroup, storeSlot func(uint64), address PublicKey, from, to uint64) *getSlotsForAddressJob { return &getSlotsForAddressJob{ address: address, client: client, @@ -80,30 +80,29 @@ func (f *getSlotsForAddressJob) run(ctx context.Context) (bool, error) { } // signatures ordered from newest to oldest, defined in the Solana RPC docs - lowestSlot := sigs[0].Slot for _, sig := range sigs { - f.beforeSig = sig.Signature - if sig.Slot >= lowestSlot { - continue - } - - lowestSlot = sig.Slot // RPC may return slots that are higher than requested. Skip them to simplify mental model. if sig.Slot > f.to { continue } - f.storeSlot(sig.Slot) - if sig.Slot <= f.from { + + if sig.Slot < f.from { return true, nil } + + // no need to fetch slot, if transaction failed + if sig.Err == nil { + f.storeSlot(sig.Slot) + } } + oldestSig := sigs[len(sigs)-1] // to ensure we do not overload RPC perform next call as a separate job err = f.workers.Do(ctx, &getSlotsForAddressJob{ address: f.address, - beforeSig: f.beforeSig, + beforeSig: oldestSig.Signature, from: f.from, - to: lowestSlot, + to: oldestSig.Slot, client: f.client, storeSlot: f.storeSlot, done: f.done, diff --git a/pkg/solana/logpoller/job_get_slots_for_addr_test.go b/pkg/solana/logpoller/job_get_slots_for_addr_test.go new file mode 100644 index 000000000..48c66060f --- /dev/null +++ b/pkg/solana/logpoller/job_get_slots_for_addr_test.go @@ -0,0 +1,121 @@ +package logpoller + +import ( + "context" + "errors" + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/worker" +) + +func TestGetSlotsForAddressJob(t *testing.T) { + sig, err := solana.SignatureFromBase58("4VJEi7D9ia2R4L6xgPE7bKTtNAtJ2KGHTtq1VEztEMtpcevGPzGpyvnm6EgkMCPhSQTAQ9XwdyqVYzqbf35zJyF") + require.NoError(t, err) + rawAddr, err := solana.PublicKeyFromBase58("Cv4T27XbjVoKUYwP72NQQanvZeA7W4YF9L4EnYT9kx5o") + require.NoError(t, err) + address := PublicKey(rawAddr) + const from = uint64(10) + const to = uint64(20) + t.Run("String representation contains all details", func(t *testing.T) { + job := &getSlotsForAddressJob{address: address, from: from, to: to, beforeSig: sig} + require.Equal(t, "getSlotsForAddress: Cv4T27XbjVoKUYwP72NQQanvZeA7W4YF9L4EnYT9kx5o, from: 10, to: 20, beforeSig: 4VJEi7D9ia2R4L6xgPE7bKTtNAtJ2KGHTtq1VEztEMtpcevGPzGpyvnm6EgkMCPhSQTAQ9XwdyqVYzqbf35zJyF", job.String()) + }) + t.Run("Returns error if RPC request failed", func(t *testing.T) { + client := mocks.NewRPCClient(t) + expectedError := errors.New("rpc error") + client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, key solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { + require.Equal(t, address.String(), key.String()) + require.NotNil(t, opts) + require.True(t, opts.Before.IsZero()) + require.NotNil(t, opts.MinContextSlot) + require.Equal(t, to, *opts.MinContextSlot) + return nil, expectedError + }).Once() + job := newGetSlotsForAddress(client, nil, nil, address, from, to) + err := job.Run(tests.Context(t)) + require.ErrorIs(t, err, expectedError) + }) + requireJobIsDone := func(t *testing.T, done <-chan struct{}, msg string) { + select { + case <-done: + default: + require.Fail(t, msg) + } + } + t.Run("Completes successfully if there is no signatures", func(t *testing.T) { + client := mocks.NewRPCClient(t) + client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).Return([]*rpc.TransactionSignature{}, nil).Once() + job := newGetSlotsForAddress(client, nil, nil, address, from, to) + err := job.Run(tests.Context(t)) + require.NoError(t, err) + requireJobIsDone(t, job.Done(), "expected job to be done") + }) + t.Run("Stores slots only if they are in range", func(t *testing.T) { + client := mocks.NewRPCClient(t) + var signatures []*rpc.TransactionSignature + for _, slot := range []uint64{21, 20, 11, 10, 9} { + if slot == 20 { + // must be skipped due to error + signatures = append(signatures, &rpc.TransactionSignature{Slot: 19, Err: errors.New("transaction failed")}) + } + if slot == 10 { + // add errored transaction before a valid into the last slot within range to ensure that we won't skip that slot + signatures = append(signatures, &rpc.TransactionSignature{Slot: 10, Err: errors.New("transaction failed")}) + } + signatures = append(signatures, &rpc.TransactionSignature{Slot: slot}) + } + client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).Return(signatures, nil).Once() + var actualSlots []uint64 + job := newGetSlotsForAddress(client, nil, func(s uint64) { + actualSlots = append(actualSlots, s) + }, address, from, to) + err := job.Run(tests.Context(t)) + require.NoError(t, err) + requireJobIsDone(t, job.Done(), "expected job to be done") + require.Equal(t, []uint64{20, 11, 10}, actualSlots) + }) + t.Run("If slot range may have more signatures, schedules a new job", func(t *testing.T) { + client := mocks.NewRPCClient(t) + signatures := []*rpc.TransactionSignature{{Slot: 19, Signature: sig}} + client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).Return(signatures, nil).Once() + workers := mocks.NewWorkerGroup(t) + var secondJob *getSlotsForAddressJob + workers.EXPECT().Do(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, rawJob worker.Job) error { + job, ok := rawJob.(*getSlotsForAddressJob) + require.True(t, ok) + require.Equal(t, from, job.from) + require.Equal(t, uint64(19), job.to) + require.Equal(t, address, job.address) + require.Equal(t, sig, job.beforeSig) + secondJob = job + return nil + }) + var actualSlots []uint64 + firstJob := newGetSlotsForAddress(client, workers, func(s uint64) { + actualSlots = append(actualSlots, s) + }, address, from, to) + err := firstJob.Run(tests.Context(t)) + require.NoError(t, err) + select { + case <-firstJob.Done(): + require.FailNow(t, "expected job to schedule second job and not to be done") + default: + } + require.NotNil(t, secondJob) + client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).Return([]*rpc.TransactionSignature{{Slot: 18}, {Slot: 9}}, nil).Once() + err = secondJob.Run(tests.Context(t)) + require.NoError(t, err) + requireJobIsDone(t, firstJob.Done(), "expected fist job to be done") + requireJobIsDone(t, secondJob.Done(), "expected second job to be done") + require.Equal(t, []uint64{19, 18}, actualSlots) + }) +} diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 7cbde75da..7bb704624 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -37,6 +37,10 @@ type RPCClient interface { GetSignaturesForAddressWithOpts(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) GetSlot(ctx context.Context, commitment rpc.CommitmentType) (uint64, error) } + +type WorkerGroup interface { + Do(ctx context.Context, job worker.Job) error +} type EncodedLogCollector struct { // service state management services.Service diff --git a/pkg/solana/logpoller/mocks/worker_group.go b/pkg/solana/logpoller/mocks/worker_group.go new file mode 100644 index 000000000..e4a98bdec --- /dev/null +++ b/pkg/solana/logpoller/mocks/worker_group.go @@ -0,0 +1,85 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + worker "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/worker" +) + +// WorkerGroup is an autogenerated mock type for the WorkerGroup type +type WorkerGroup struct { + mock.Mock +} + +type WorkerGroup_Expecter struct { + mock *mock.Mock +} + +func (_m *WorkerGroup) EXPECT() *WorkerGroup_Expecter { + return &WorkerGroup_Expecter{mock: &_m.Mock} +} + +// Do provides a mock function with given fields: ctx, job +func (_m *WorkerGroup) Do(ctx context.Context, job worker.Job) error { + ret := _m.Called(ctx, job) + + if len(ret) == 0 { + panic("no return value specified for Do") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, worker.Job) error); ok { + r0 = rf(ctx, job) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// WorkerGroup_Do_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Do' +type WorkerGroup_Do_Call struct { + *mock.Call +} + +// Do is a helper method to define mock.On call +// - ctx context.Context +// - job worker.Job +func (_e *WorkerGroup_Expecter) Do(ctx interface{}, job interface{}) *WorkerGroup_Do_Call { + return &WorkerGroup_Do_Call{Call: _e.mock.On("Do", ctx, job)} +} + +func (_c *WorkerGroup_Do_Call) Run(run func(ctx context.Context, job worker.Job)) *WorkerGroup_Do_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(worker.Job)) + }) + return _c +} + +func (_c *WorkerGroup_Do_Call) Return(_a0 error) *WorkerGroup_Do_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *WorkerGroup_Do_Call) RunAndReturn(run func(context.Context, worker.Job) error) *WorkerGroup_Do_Call { + _c.Call.Return(run) + return _c +} + +// NewWorkerGroup creates a new instance of WorkerGroup. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWorkerGroup(t interface { + mock.TestingT + Cleanup(func()) +}) *WorkerGroup { + mock := &WorkerGroup{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}