Skip to content

Commit

Permalink
implement LogPoller's backfill and processBlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 13, 2025
1 parent dd8ff03 commit 6b785cf
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 241 deletions.
2 changes: 1 addition & 1 deletion pkg/solana/logpoller/blocks_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger, expectedBlocks [
queue: list.New(),
readyBlocks: make(map[uint64]Block),
inBlocks: inBlocks,
outBlocks: make(chan Block),
outBlocks: make(chan Block, 16),
receivedNewBlock: make(chan struct{}, 1),
lggr: lggr,
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) {
}
}

func (fl *filters) GetDistinctAddresses(ctx context.Context) ([]PublicKey, error) {
err := fl.LoadFilters(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load filters: %w", err)
}

fl.filtersMutex.RLock()
defer fl.filtersMutex.RUnlock()

var result []PublicKey
set := map[PublicKey]struct{}{}
for _, filter := range fl.filtersByID {
if _, ok := set[filter.Address]; ok {
continue
}

set[filter.Address] = struct{}{}
result = append(result, filter.Address)
}

return result, nil
}

// MatchingFilters - returns iterator to go through all matching filters.
// Requires LoadFilters to be called at least once.
func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] {
Expand Down
5 changes: 0 additions & 5 deletions pkg/solana/logpoller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,3 @@ type eventDetail struct {
trxIdx int
trxSig solana.Signature
}

type wrappedParser interface {
ProgramEventProcessor
ExpectBlock(uint64)
}
4 changes: 4 additions & 0 deletions pkg/solana/logpoller/job_get_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ func (j *getBlockJob) Done() <-chan struct{} {

func (j *getBlockJob) Run(ctx context.Context) error {
var excludeRewards bool
// TODO: move min version definition to an RPC Client
// NOTE: if max supported transaction version is changed after creation of a block that contains transactions of a new version
// we at risk of producing duplicate events! To avoid this we'll need to do block based migration.
version := uint64(0) // pull all tx types (legacy + v0)
block, err := j.client.GetBlockWithOpts(
ctx,
j.slotNumber,
// NOTE: any change to the filtering argmuments may affect calculation of logIndex, which to lead to events duplication.
&rpc.GetBlockOpts{
Encoding: solana.EncodingBase64,
Commitment: rpc.CommitmentFinalized,
Expand Down
6 changes: 4 additions & 2 deletions pkg/solana/logpoller/job_get_slots_for_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ func (f *getSlotsForAddressJob) run(ctx context.Context) (bool, error) {
continue
}

// TODO: ignore slots that are higher than to

lowestSlot = sig.Slot
// RPC may return slots that are higher than requested. Skip them to simplify mental model.
if sig.Slot > f.to {
continue
}
f.storeSlot(sig.Slot)
if sig.Slot <= f.from {
return true, nil
Expand Down
202 changes: 7 additions & 195 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
Expand Down Expand Up @@ -35,42 +33,27 @@ type RPCClient interface {
GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType) (out rpc.BlocksResult, err error)
GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error)
GetSignaturesForAddressWithOpts(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)
GetSlot(ctx context.Context, commitment rpc.CommitmentType) (uint64, error)
}

const (
DefaultNextSlotPollingInterval = 1_000 * time.Millisecond
)

type EncodedLogCollector struct {
// service state management
services.Service
engine *services.Engine

// dependencies and configuration
client RPCClient
lggr logger.Logger
rpcTimeLimit time.Duration
client RPCClient
lggr logger.Logger

// internal state
chSlot chan uint64
chBlock chan uint64
chJobs chan Job
workers *WorkerGroup

lastSentSlot atomic.Uint64
}

func NewEncodedLogCollector(
client RPCClient,
lggr logger.Logger,
) *EncodedLogCollector {
c := &EncodedLogCollector{
client: client,
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
lggr: lggr,
rpcTimeLimit: 1 * time.Second,
client: client,
lggr: lggr,
}

c.Service, c.engine = services.Config{
Expand All @@ -80,8 +63,6 @@ func NewEncodedLogCollector(

return []services.Service{c.workers}
},
Start: c.start,
Close: c.close,
}.NewServiceEngine(lggr)

return c
Expand Down Expand Up @@ -134,7 +115,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots
}
}

go func() {
c.engine.Go(func(ctx context.Context) {
for _, job := range getBlocksJobs {
select {
case <-ctx.Done():
Expand All @@ -144,7 +125,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots
}
}
close(blocks)
}()
})

return blocks, nil
}
Expand Down Expand Up @@ -176,172 +157,3 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse

return sortedBlocks, cleanUp, nil
}

func (c *EncodedLogCollector) start(_ context.Context) error {
//c.engine.Go(c.runSlotPolling)
//c.engine.Go(c.runJobProcessing)

return nil
}

func (c *EncodedLogCollector) close() error {
return nil
}

func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) {
for {
// TODO: fetch slots using getBlocksWithLimit RPC Method
timer := time.NewTimer(DefaultNextSlotPollingInterval)

select {
case <-ctx.Done():
timer.Stop()

return
case <-timer.C:
ctxB, cancel := context.WithTimeout(ctx, c.rpcTimeLimit)

// not to be run as a job, but as a blocking call
result, err := c.client.GetLatestBlockhash(ctxB, rpc.CommitmentFinalized)
if err != nil {
c.lggr.Error("failed to get latest blockhash", "err", err)
cancel()

continue
}

cancel()

slot := result.Context.Slot
// if the slot is not higher than the highest slot, skip it
if c.lastSentSlot.Load() >= slot {
continue
}

select {
case c.chSlot <- slot:
c.lggr.Debugw("Fetched new slot and sent it for processing", "slot", slot)
c.lastSentSlot.Store(slot)
default:
}
}

timer.Stop()
}
}

//func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
// start := uint64(0)
// for {
// select {
// case <-ctx.Done():
// return
// case end := <-c.chSlot:
// if start >= end {
// continue
// }
//
// if start == 0 {
// start = end // TODO: should be loaded from db or passed into EncodedLogCollector as arg
// }
// // load blocks in slot range
// if err := c.loadSlotBlocksRange(ctx, start, end); err != nil {
// // a retry will happen anyway on the next round of slots
// // so the error is handled by doing nothing
// c.lggr.Errorw("failed to load slot blocks range", "start", start, "end", end, "err", err)
// continue
// }
//
// start = end + 1
// }
// }
//}

//func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
// for {
// select {
// case <-ctx.Done():
// return
// case slot := <-c.chBlock:
// if err := c.workers.Do(ctx, &getBlockJob{
// slotNumber: slot,
// client: c.client,
// parser: c.ordered,
// chJobs: c.chJobs,
// }); err != nil {
// c.lggr.Errorf("failed to add job to queue: %s", err)
// }
// }
// }
//}

func (c *EncodedLogCollector) runJobProcessing(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job := <-c.chJobs:
if err := c.workers.Do(ctx, job); err != nil {
c.lggr.Errorf("failed to add job to queue: %s", err)
}
}
}
}

//func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, end uint64) error {
// if start >= end {
// return errors.New("the start block must come before the end block")
// }
//
// var (
// result rpc.BlocksResult
// err error
// )
//
// rpcCtx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit)
// defer cancel()
//
// if result, err = c.client.GetBlocks(rpcCtx, start, &end, rpc.CommitmentFinalized); err != nil {
// return err
// }
//
// c.lggr.Debugw("loaded blocks for slots range", "start", start, "end", end, "blocks", len(result))
//
// // as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case
// // that the RPC changes and results get jumbled.
// slices.SortFunc(result, func(a, b uint64) int {
// if a < b {
// return -1
// } else if a > b {
// return 1
// }
//
// return 0
// })
//
// for _, block := range result {
// c.ordered.ExpectBlock(block)
//
// select {
// case <-ctx.Done():
// return nil
// case c.chBlock <- block:
// }
// }
//
// return nil
//}

type unorderedParser struct {
parser ProgramEventProcessor
}

func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser {
return &unorderedParser{parser: parser}
}

func (p *unorderedParser) ExpectBlock(_ uint64) {}
func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {}
func (p *unorderedParser) Process(block Block) error {
return p.parser.Process(block)
}
Loading

0 comments on commit 6b785cf

Please sign in to comment.