Skip to content

Commit

Permalink
log poller tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 15, 2025
1 parent f07cc75 commit 82d579b
Show file tree
Hide file tree
Showing 6 changed files with 764 additions and 37 deletions.
12 changes: 12 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ packages:
interfaces:
RPCClient:
WorkerGroup:
filtersI:
config:
inpackage: True
dir: "pkg/solana/logpoller"
filename: mock_filters.go
mockname: mockFilters
logsLoader:
config:
inpackage: True
dir: "pkg/solana/logpoller"
filename: mock_logs_loader.go
mockname: mockLogsLoader
ORM:
config:
inpackage: True
Expand Down
6 changes: 3 additions & 3 deletions pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ func TestFilters_GetFiltersToBackfill(t *testing.T) {
require.NoError(t, filters.RegisterFilter(tests.Context(t), notBackfilled))
ensureInQueue(notBackfilled)
// new filter is always added to the queue
newFilter := Filter{Name: "new filter", ID: 3}
orm.EXPECT().InsertFilter(mock.Anything, newFilter).Return(newFilter.ID, nil).Once()
newFilter := Filter{Name: "new filter"}
orm.EXPECT().InsertFilter(mock.Anything, newFilter).Return(3, nil).Once()
require.NoError(t, filters.RegisterFilter(tests.Context(t), newFilter))
ensureInQueue(notBackfilled, newFilter)
ensureInQueue(notBackfilled, Filter{ID: 3, Name: "new filter"})
}
64 changes: 30 additions & 34 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ type logsLoader interface {
BackfillForAddresses(ctx context.Context, addresses []PublicKey, fromSlot, toSlot uint64) (orderedBlocks <-chan Block, cleanUp func(), err error)
}

type filtersI interface {
RegisterFilter(ctx context.Context, filter Filter) error
UnregisterFilter(ctx context.Context, name string) error
LoadFilters(ctx context.Context) error
PruneFilters(ctx context.Context) error
GetDistinctAddresses(ctx context.Context) ([]PublicKey, error)
GetFiltersToBackfill() []Filter
MarkFilterBackfilled(ctx context.Context, filterID int64) error
}

type LogPoller struct {
services.Service
eng *services.Engine
Expand All @@ -39,8 +49,7 @@ type LogPoller struct {
lastProcessedSlot int64
client RPCClient
loader logsLoader

filters *filters
filters filtersI
}

func New(lggr logger.SugaredLogger, orm ORM, client RPCClient) *LogPoller {
Expand All @@ -65,7 +74,11 @@ func New(lggr logger.SugaredLogger, orm ORM, client RPCClient) *LogPoller {
return lp
}

func (lp *LogPoller) start(context.Context) error {
func (lp *LogPoller) start(ctx context.Context) error {
err := lp.filters.LoadFilters(ctx)
if err != nil {
return fmt.Errorf("failed loading filters: %w", err)
}
lp.eng.GoTick(services.NewTicker(time.Second), func(ctx context.Context) {
err := lp.run(ctx)
if err != nil {
Expand All @@ -90,26 +103,6 @@ func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error {
return lp.filters.UnregisterFilter(ctx, name)
}

func (lp *LogPoller) loadFilters(ctx context.Context) error {
retryTicker := services.TickerConfig{Initial: 0, JitterPct: services.DefaultJitter}.NewTicker(time.Second)
defer retryTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-retryTicker.C:
}
err := lp.filters.LoadFilters(ctx)
if err != nil {
lp.lggr.Errorw("Failed loading filters in init logpoller loop, retrying later", "err", err)
continue
}

return nil
}
}

func (lp *LogPoller) getLastProcessedSlot(ctx context.Context) (int64, error) {
if lp.lastProcessedSlot != 0 {
return lp.lastProcessedSlot, nil
Expand All @@ -132,7 +125,7 @@ func (lp *LogPoller) getLastProcessedSlot(ctx context.Context) (int64, error) {
if latestBlock == 0 {
return 0, fmt.Errorf("latest finalized slot is 0 - waiting for next slot to start processing")
}
return int64(latestBlock) - 1, err
return int64(latestBlock) - 1, nil
}

func (lp *LogPoller) backfillFilters(ctx context.Context, filters []Filter, to int64) error {
Expand All @@ -155,7 +148,10 @@ func (lp *LogPoller) backfillFilters(ctx context.Context, filters []Filter, to i
}

for _, filter := range filters {
err = errors.Join(err, lp.filters.MarkFilterBackfilled(ctx, filter.ID))
filterErr := lp.filters.MarkFilterBackfilled(ctx, filter.ID)
if filterErr != nil {
err = errors.Join(err, fmt.Errorf("failed to mark filter %d backfilled: %w", filter.ID, filterErr))
}
}

return err
Expand All @@ -179,7 +175,7 @@ consumedAllBlocks:
}

batch := []Block{block}
batch = appendBuffered(blocks, batch)
batch = appendBuffered(blocks, 16, batch)
err = lp.processBlocks(ctx, batch)
if err != nil {
return fmt.Errorf("error processing blocks: %w", err)
Expand All @@ -190,7 +186,7 @@ consumedAllBlocks:
return nil
}

func appendBuffered(ch <-chan Block, blocks []Block) []Block {
func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
for {
select {
case block, ok := <-ch:
Expand All @@ -199,6 +195,9 @@ func appendBuffered(ch <-chan Block, blocks []Block) []Block {
}

blocks = append(blocks, block)
if len(blocks) >= max {
return blocks
}
default:
return blocks
}
Expand All @@ -211,11 +210,6 @@ func (lp *LogPoller) processBlocks(ctx context.Context, blocks []Block) error {
}

func (lp *LogPoller) run(ctx context.Context) error {
err := lp.loadFilters(ctx)
if err != nil {
return fmt.Errorf("failed loading filters: %w", err)
}

lastProcessedSlot, err := lp.getLastProcessedSlot(ctx)
if err != nil {
return fmt.Errorf("failed getting last processed slot: %w", err)
Expand All @@ -231,7 +225,9 @@ func (lp *LogPoller) run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed getting addresses: %w", err)
}

if len(addresses) == 0 {
return nil
}
highestSlot, err := lp.client.GetSlot(ctx, rpc.CommitmentFinalized)
if err != nil {
return fmt.Errorf("failed getting highest slot: %w", err)
Expand All @@ -241,7 +237,7 @@ func (lp *LogPoller) run(ctx context.Context) error {
return fmt.Errorf("last processed slot %d is higher than highest RPC slot %d", lastProcessedSlot, highestSlot)
}

err = lp.processBlocksRange(ctx, addresses, lastProcessedSlot, int64(highestSlot))
err = lp.processBlocksRange(ctx, addresses, lastProcessedSlot+1, int64(highestSlot))
if err != nil {
return fmt.Errorf("failed processing block range [%d, %d]: %w", lastProcessedSlot+1, highestSlot, err)
}
Expand Down
Loading

0 comments on commit 82d579b

Please sign in to comment.