Skip to content

Commit

Permalink
Fixed Smoke Test
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 15, 2025
1 parent 82d579b commit 2e8f6cf
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 91 deletions.
2 changes: 1 addition & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ packages:
inpackage: True
dir: "pkg/solana/logpoller"
filename: mock_orm.go
mockname: mockORM
mockname: MockORM

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
"encoding/base64"
"fmt"
"os"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/ws"
"github.com/gagliardetto/solana-go/text"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -61,15 +63,17 @@ func TestEventLoader(t *testing.T) {
totalLogsToSend := 30
parser := &printParser{t: t}
sender := newLogSender(t, rpcClient, wsClient)
collector := logpoller.NewEncodedLogCollector(
rpcClient,
parser,
logger.Nop(),
)
orm := logpoller.NewMockORM(t) // TODO: replace with real DB, when available
programPubKey, err := solana.PublicKeyFromBase58(programPubKey)
require.NoError(t, err)
orm.EXPECT().SelectFilters(mock.Anything).Return([]logpoller.Filter{{IsBackfilled: false, Address: logpoller.PublicKey(programPubKey)}}, nil).Once()
orm.EXPECT().MarkFilterBackfilled(mock.Anything, mock.Anything).Return(nil).Once()
orm.EXPECT().GetLatestBlock(mock.Anything).Return(0, sql.ErrNoRows)
lp := logpoller.NewWithCustomProcessor(logger.TestSugared(t), orm, rpcClient, parser.ProcessBlocks)

require.NoError(t, collector.Start(ctx))
require.NoError(t, lp.Start(ctx))
t.Cleanup(func() {
require.NoError(t, collector.Close())
require.NoError(t, lp.Close())
})

go func(ctx context.Context, sender *logSender, privateKey *solana.PrivateKey) {
Expand Down Expand Up @@ -143,7 +147,18 @@ type printParser struct {
values []uint64
}

func (p *printParser) Process(block logpoller.Block) error {
func (p *printParser) ProcessBlocks(ctx context.Context, blocks []logpoller.Block) error {
for _, b := range blocks {
err := p.process(b)
if err != nil {
return err
}
}

return nil
}

func (p *printParser) process(block logpoller.Block) error {
p.t.Helper()

sum := sha256.Sum256([]byte("event:TestEvent"))
Expand Down
28 changes: 14 additions & 14 deletions pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func TestFilters_LoadFilters(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(logger.Sugared(logger.Test(t)), orm)
ctx := tests.Context(t)
orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once()
Expand Down Expand Up @@ -65,13 +65,13 @@ func TestFilters_LoadFilters(t *testing.T) {
func TestFilters_RegisterFilter(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
t.Run("Returns an error if name is empty", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
err := fs.RegisterFilter(tests.Context(t), Filter{})
require.EqualError(t, err, "name is required")
})
t.Run("Returns an error if fails to load filters from db", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once()
err := fs.RegisterFilter(tests.Context(t), Filter{Name: "Filter"})
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestFilters_RegisterFilter(t *testing.T) {
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("Updating %s", tc.Name), func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
const filterName = "Filter"
dbFilter := Filter{Name: filterName}
Expand All @@ -124,7 +124,7 @@ func TestFilters_RegisterFilter(t *testing.T) {
}
})
t.Run("Happy path", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
const filterName = "Filter"
orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once()
Expand All @@ -150,7 +150,7 @@ func TestFilters_RegisterFilter(t *testing.T) {
require.Equal(t, filter, storedFilters[0])
})
t.Run("Can reregister after unregister", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
const filterName = "Filter"
orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once()
Expand All @@ -174,22 +174,22 @@ func TestFilters_RegisterFilter(t *testing.T) {
func TestFilters_UnregisterFilter(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
t.Run("Returns an error if fails to load filters from db", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once()
err := fs.UnregisterFilter(tests.Context(t), "Filter")
require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed")
})
t.Run("Noop if filter is not present", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
const filterName = "Filter"
orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once()
err := fs.UnregisterFilter(tests.Context(t), filterName)
require.NoError(t, err)
})
t.Run("Returns error if fails to mark filter as deleted", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
const filterName = "Filter"
const id int64 = 10
Expand All @@ -199,7 +199,7 @@ func TestFilters_UnregisterFilter(t *testing.T) {
require.EqualError(t, err, "failed to mark filter deleted: db query failed")
})
t.Run("Happy path", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
const filterName = "Filter"
const id int64 = 10
Expand All @@ -217,7 +217,7 @@ func TestFilters_UnregisterFilter(t *testing.T) {
func TestFilters_PruneFilters(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
t.Run("Happy path", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
toDelete := Filter{
ID: 1,
Expand All @@ -237,7 +237,7 @@ func TestFilters_PruneFilters(t *testing.T) {
require.Len(t, fs.filtersToDelete, 0)
})
t.Run("If DB removal fails will add filters back into removal slice ", func(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
fs := newFilters(lggr, orm)
toDelete := Filter{
ID: 1,
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestFilters_PruneFilters(t *testing.T) {
}

func TestFilters_MatchingFilters(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
lggr := logger.Sugared(logger.Test(t))
expectedFilter1 := Filter{
ID: 1,
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestFilters_MatchingFilters(t *testing.T) {
}

func TestFilters_GetFiltersToBackfill(t *testing.T) {
orm := newMockORM(t)
orm := NewMockORM(t)
lggr := logger.Sugared(logger.Test(t))
backfilledFilter := Filter{
ID: 1,
Expand Down
19 changes: 17 additions & 2 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type LogPoller struct {
client RPCClient
loader logsLoader
filters filtersI
processBlocks func(ctx context.Context, blocks []Block) error // TODO: introduced for smoke test. Remove after NONEVM-916 is merged
}

func New(lggr logger.SugaredLogger, orm ORM, client RPCClient) *LogPoller {
Expand All @@ -61,6 +62,8 @@ func New(lggr logger.SugaredLogger, orm ORM, client RPCClient) *LogPoller {
client: client,
}

lp.processBlocks = lp.processBlocksImpl

lp.Service, lp.eng = services.Config{
Name: "LogPollerService",
Start: lp.start,
Expand All @@ -74,6 +77,12 @@ func New(lggr logger.SugaredLogger, orm ORM, client RPCClient) *LogPoller {
return lp
}

func NewWithCustomProcessor(lggr logger.SugaredLogger, orm ORM, client RPCClient, processBlocks func(ctx context.Context, blocks []Block) error) *LogPoller {
lp := New(lggr, orm, client)
lp.processBlocks = processBlocks
return lp
}

func (lp *LogPoller) start(ctx context.Context) error {
err := lp.filters.LoadFilters(ctx)
if err != nil {
Expand Down Expand Up @@ -204,7 +213,7 @@ func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
}
}

func (lp *LogPoller) processBlocks(ctx context.Context, blocks []Block) error {
func (lp *LogPoller) processBlocksImpl(ctx context.Context, blocks []Block) error {
// TODO: add logic implemented by NONEVM-916
return nil
}
Expand Down Expand Up @@ -233,10 +242,16 @@ func (lp *LogPoller) run(ctx context.Context) error {
return fmt.Errorf("failed getting highest slot: %w", err)
}

if lastProcessedSlot >= int64(highestSlot) {
if lastProcessedSlot > int64(highestSlot) {
return fmt.Errorf("last processed slot %d is higher than highest RPC slot %d", lastProcessedSlot, highestSlot)
}

if lastProcessedSlot == int64(highestSlot) {
lp.lggr.Debugw("RPC's latest finalized block is the same as latest processed - skipping", "lastProcessedSlot", lastProcessedSlot)
return nil
}

lp.lggr.Debugw("Got new slot range to process", "from", lastProcessedSlot+1, "to", highestSlot)
err = lp.processBlocksRange(ctx, addresses, lastProcessedSlot+1, int64(highestSlot))
if err != nil {
return fmt.Errorf("failed processing block range [%d, %d]: %w", lastProcessedSlot+1, highestSlot, err)
Expand Down
14 changes: 5 additions & 9 deletions pkg/solana/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type mockedLP struct {
ORM *mockORM
ORM *MockORM
Client *mocks.RPCClient
Loader *mockLogsLoader
Filters *mockFilters
Expand All @@ -27,18 +27,14 @@ type mockedLP struct {

func newMockedLP(t *testing.T) mockedLP {
mockedLP := mockedLP{
ORM: newMockORM(t),
ORM: NewMockORM(t),
Client: mocks.NewRPCClient(t),
Loader: newMockLogsLoader(t),
Filters: newMockFilters(t),
}
mockedLP.LogPoller = &LogPoller{
lggr: logger.TestSugared(t),
orm: mockedLP.ORM,
client: mockedLP.Client,
loader: mockedLP.Loader,
filters: mockedLP.Filters,
}
mockedLP.LogPoller = New(logger.TestSugared(t), mockedLP.ORM, mockedLP.Client)
mockedLP.LogPoller.loader = mockedLP.Loader
mockedLP.LogPoller.filters = mockedLP.Filters
return mockedLP
}

Expand Down
Loading

0 comments on commit 2e8f6cf

Please sign in to comment.