diff --git a/pkg/solana/logpoller/blocks_sorter.go b/pkg/solana/logpoller/blocks_sorter.go index 93fb79696..86b7ae423 100644 --- a/pkg/solana/logpoller/blocks_sorter.go +++ b/pkg/solana/logpoller/blocks_sorter.go @@ -32,7 +32,7 @@ func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger, expectedBlocks [ queue: list.New(), readyBlocks: make(map[uint64]Block), inBlocks: inBlocks, - outBlocks: make(chan Block), + outBlocks: make(chan Block, 16), receivedNewBlock: make(chan struct{}, 1), lggr: lggr, } diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 4a1496371..a0b17af52 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -182,6 +182,29 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) { } } +func (fl *filters) GetDistinctAddresses(ctx context.Context) ([]PublicKey, error) { + err := fl.LoadFilters(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load filters: %w", err) + } + + fl.filtersMutex.RLock() + defer fl.filtersMutex.RUnlock() + + var result []PublicKey + set := map[PublicKey]struct{}{} + for _, filter := range fl.filtersByID { + if _, ok := set[filter.Address]; ok { + continue + } + + set[filter.Address] = struct{}{} + result = append(result, filter.Address) + } + + return result, nil +} + // MatchingFilters - returns iterator to go through all matching filters. // Requires LoadFilters to be called at least once. func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index 24078a953..8c1ddb94a 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -37,8 +37,3 @@ type eventDetail struct { trxIdx int trxSig solana.Signature } - -type wrappedParser interface { - ProgramEventProcessor - ExpectBlock(uint64) -} diff --git a/pkg/solana/logpoller/job_get_block.go b/pkg/solana/logpoller/job_get_block.go index a557aa027..dfba2931e 100644 --- a/pkg/solana/logpoller/job_get_block.go +++ b/pkg/solana/logpoller/job_get_block.go @@ -36,10 +36,14 @@ func (j *getBlockJob) Done() <-chan struct{} { func (j *getBlockJob) Run(ctx context.Context) error { var excludeRewards bool + // TODO: move min version definition to an RPC Client + // NOTE: if max supported transaction version is changed after creation of a block that contains transactions of a new version + // we at risk of producing duplicate events! To avoid this we'll need to do block based migration. version := uint64(0) // pull all tx types (legacy + v0) block, err := j.client.GetBlockWithOpts( ctx, j.slotNumber, + // NOTE: any change to the filtering argmuments may affect calculation of logIndex, which to lead to events duplication. &rpc.GetBlockOpts{ Encoding: solana.EncodingBase64, Commitment: rpc.CommitmentFinalized, diff --git a/pkg/solana/logpoller/job_get_slots_for_addr.go b/pkg/solana/logpoller/job_get_slots_for_addr.go index 49477c097..d7009df59 100644 --- a/pkg/solana/logpoller/job_get_slots_for_addr.go +++ b/pkg/solana/logpoller/job_get_slots_for_addr.go @@ -85,9 +85,11 @@ func (f *getSlotsForAddressJob) run(ctx context.Context) (bool, error) { continue } - // TODO: ignore slots that are higher than to - lowestSlot = sig.Slot + // RPC may return slots that are higher than requested. Skip them to simplify mental model. + if sig.Slot > f.to { + continue + } f.storeSlot(sig.Slot) if sig.Slot <= f.from { return true, nil diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 7c83b3671..23ed83c24 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -5,8 +5,6 @@ import ( "fmt" "sort" "sync" - "sync/atomic" - "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" @@ -35,29 +33,18 @@ type RPCClient interface { GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType) (out rpc.BlocksResult, err error) GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) GetSignaturesForAddressWithOpts(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) + GetSlot(ctx context.Context, commitment rpc.CommitmentType) (uint64, error) } - -const ( - DefaultNextSlotPollingInterval = 1_000 * time.Millisecond -) - type EncodedLogCollector struct { // service state management services.Service engine *services.Engine // dependencies and configuration - client RPCClient - lggr logger.Logger - rpcTimeLimit time.Duration + client RPCClient + lggr logger.Logger - // internal state - chSlot chan uint64 - chBlock chan uint64 - chJobs chan Job workers *WorkerGroup - - lastSentSlot atomic.Uint64 } func NewEncodedLogCollector( @@ -65,12 +52,8 @@ func NewEncodedLogCollector( lggr logger.Logger, ) *EncodedLogCollector { c := &EncodedLogCollector{ - client: client, - chSlot: make(chan uint64), - chBlock: make(chan uint64, 1), - chJobs: make(chan Job, 1), - lggr: lggr, - rpcTimeLimit: 1 * time.Second, + client: client, + lggr: lggr, } c.Service, c.engine = services.Config{ @@ -80,8 +63,6 @@ func NewEncodedLogCollector( return []services.Service{c.workers} }, - Start: c.start, - Close: c.close, }.NewServiceEngine(lggr) return c @@ -134,7 +115,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots } } - go func() { + c.engine.Go(func(ctx context.Context) { for _, job := range getBlocksJobs { select { case <-ctx.Done(): @@ -144,7 +125,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots } } close(blocks) - }() + }) return blocks, nil } @@ -176,172 +157,3 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse return sortedBlocks, cleanUp, nil } - -func (c *EncodedLogCollector) start(_ context.Context) error { - //c.engine.Go(c.runSlotPolling) - //c.engine.Go(c.runJobProcessing) - - return nil -} - -func (c *EncodedLogCollector) close() error { - return nil -} - -func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) { - for { - // TODO: fetch slots using getBlocksWithLimit RPC Method - timer := time.NewTimer(DefaultNextSlotPollingInterval) - - select { - case <-ctx.Done(): - timer.Stop() - - return - case <-timer.C: - ctxB, cancel := context.WithTimeout(ctx, c.rpcTimeLimit) - - // not to be run as a job, but as a blocking call - result, err := c.client.GetLatestBlockhash(ctxB, rpc.CommitmentFinalized) - if err != nil { - c.lggr.Error("failed to get latest blockhash", "err", err) - cancel() - - continue - } - - cancel() - - slot := result.Context.Slot - // if the slot is not higher than the highest slot, skip it - if c.lastSentSlot.Load() >= slot { - continue - } - - select { - case c.chSlot <- slot: - c.lggr.Debugw("Fetched new slot and sent it for processing", "slot", slot) - c.lastSentSlot.Store(slot) - default: - } - } - - timer.Stop() - } -} - -//func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) { -// start := uint64(0) -// for { -// select { -// case <-ctx.Done(): -// return -// case end := <-c.chSlot: -// if start >= end { -// continue -// } -// -// if start == 0 { -// start = end // TODO: should be loaded from db or passed into EncodedLogCollector as arg -// } -// // load blocks in slot range -// if err := c.loadSlotBlocksRange(ctx, start, end); err != nil { -// // a retry will happen anyway on the next round of slots -// // so the error is handled by doing nothing -// c.lggr.Errorw("failed to load slot blocks range", "start", start, "end", end, "err", err) -// continue -// } -// -// start = end + 1 -// } -// } -//} - -//func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { -// for { -// select { -// case <-ctx.Done(): -// return -// case slot := <-c.chBlock: -// if err := c.workers.Do(ctx, &getBlockJob{ -// slotNumber: slot, -// client: c.client, -// parser: c.ordered, -// chJobs: c.chJobs, -// }); err != nil { -// c.lggr.Errorf("failed to add job to queue: %s", err) -// } -// } -// } -//} - -func (c *EncodedLogCollector) runJobProcessing(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case job := <-c.chJobs: - if err := c.workers.Do(ctx, job); err != nil { - c.lggr.Errorf("failed to add job to queue: %s", err) - } - } - } -} - -//func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, end uint64) error { -// if start >= end { -// return errors.New("the start block must come before the end block") -// } -// -// var ( -// result rpc.BlocksResult -// err error -// ) -// -// rpcCtx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit) -// defer cancel() -// -// if result, err = c.client.GetBlocks(rpcCtx, start, &end, rpc.CommitmentFinalized); err != nil { -// return err -// } -// -// c.lggr.Debugw("loaded blocks for slots range", "start", start, "end", end, "blocks", len(result)) -// -// // as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case -// // that the RPC changes and results get jumbled. -// slices.SortFunc(result, func(a, b uint64) int { -// if a < b { -// return -1 -// } else if a > b { -// return 1 -// } -// -// return 0 -// }) -// -// for _, block := range result { -// c.ordered.ExpectBlock(block) -// -// select { -// case <-ctx.Done(): -// return nil -// case c.chBlock <- block: -// } -// } -// -// return nil -//} - -type unorderedParser struct { - parser ProgramEventProcessor -} - -func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser { - return &unorderedParser{parser: parser} -} - -func (p *unorderedParser) ExpectBlock(_ uint64) {} -func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {} -func (p *unorderedParser) Process(block Block) error { - return p.parser.Process(block) -} diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 4c386693e..4169b9a46 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -2,9 +2,13 @@ package logpoller import ( "context" + "database/sql" "errors" + "fmt" "time" + "github.com/gagliardetto/solana-go/rpc" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" ) @@ -19,36 +23,55 @@ type ORM interface { DeleteFilters(ctx context.Context, filters map[int64]Filter) error MarkFilterDeleted(ctx context.Context, id int64) (err error) MarkFilterBackfilled(ctx context.Context, id int64) (err error) + GetLatestBlock(ctx context.Context) (int64, error) +} + +type logsLoader interface { + BackfillForAddresses(ctx context.Context, addresses []PublicKey, fromSlot, toSlot uint64) (orderedBlocks <-chan Block, cleanUp func(), err error) } type LogPoller struct { services.Service eng *services.Engine - lggr logger.SugaredLogger - orm ORM + lggr logger.SugaredLogger + orm ORM + lastProcessedSlot int64 + client RPCClient + loader logsLoader filters *filters } -func New(lggr logger.SugaredLogger, orm ORM) *LogPoller { +func New(lggr logger.SugaredLogger, orm ORM, client RPCClient) *LogPoller { lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) lp := &LogPoller{ orm: orm, lggr: lggr, filters: newFilters(lggr, orm), + client: client, } lp.Service, lp.eng = services.Config{ Name: "LogPollerService", Start: lp.start, + NewSubServices: func(l logger.Logger) []services.Service { + loader := NewEncodedLogCollector(client, lggr) + lp.loader = loader + return []services.Service{loader} + }, }.NewServiceEngine(lggr) lp.lggr = lp.eng.SugaredLogger return lp } func (lp *LogPoller) start(context.Context) error { - lp.eng.Go(lp.run) + lp.eng.GoTick(services.NewTicker(time.Second), func(ctx context.Context) { + err := lp.run(ctx) + if err != nil { + lp.lggr.Errorw("log poller tick failed", "err", err) + } + }) lp.eng.Go(lp.backgroundWorkerRun) return nil } @@ -87,38 +110,146 @@ func (lp *LogPoller) loadFilters(ctx context.Context) error { } } -func (lp *LogPoller) run(ctx context.Context) { - err := lp.loadFilters(ctx) +func (lp *LogPoller) getLastProcessedSlot(ctx context.Context) (int64, error) { + if lp.lastProcessedSlot != 0 { + return lp.lastProcessedSlot, nil + } + + latestDBBlock, err := lp.orm.GetLatestBlock(ctx) + if err == nil { + return latestDBBlock, nil + } + + if !errors.Is(err, sql.ErrNoRows) { + return 0, fmt.Errorf("error getting latest block from db: %w", err) + } + + latestBlock, err := lp.client.GetSlot(ctx, rpc.CommitmentFinalized) + if err != nil { + return 0, fmt.Errorf("error getting latest slot from RPC: %w", err) + } + + if latestBlock == 0 { + return 0, fmt.Errorf("latest finalized slot is 0 - waiting for next slot to start processing") + } + return int64(latestBlock) - 1, err +} + +func (lp *LogPoller) backfillFilters(ctx context.Context, filters []Filter, to int64) error { + addressesSet := make(map[PublicKey]struct{}) + addresses := make([]PublicKey, 0, len(filters)) + minSlot := to + for _, filter := range filters { + if _, ok := addressesSet[filter.Address]; !ok { + addressesSet[filter.Address] = struct{}{} + addresses = append(addresses, filter.Address) + } + if filter.StartingBlock < minSlot { + minSlot = filter.StartingBlock + } + } + + err := lp.processBlocksRange(ctx, addresses, minSlot, to) if err != nil { - lp.lggr.Warnw("Failed loading filters", "err", err) - return + return err } - var blocks chan struct { - BlockNumber int64 - Logs any // to be defined + for _, filter := range filters { + err = errors.Join(err, lp.filters.MarkFilterBackfilled(ctx, filter.ID)) + } + + return err +} + +func (lp *LogPoller) processBlocksRange(ctx context.Context, addresses []PublicKey, from, to int64) error { + blocks, cleanup, err := lp.loader.BackfillForAddresses(ctx, addresses, uint64(from), uint64(to)) + if err != nil { + return fmt.Errorf("error backfilling filters: %w", err) } + defer cleanup() +consumedAllBlocks: for { select { case <-ctx.Done(): - return - case block := <-blocks: - filtersToBackfill := lp.filters.GetFiltersToBackfill() - - // TODO: NONEVM-916 parse, filters and persist logs - // NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert - // of log corresponding filter won't be present in the db. Ensure to refilter and retry on insert error - for i := range filtersToBackfill { - filter := filtersToBackfill[i] - lp.eng.Go(func(ctx context.Context) { - lp.startFilterBackfill(ctx, filter, block.BlockNumber) - }) + return ctx.Err() + case block, ok := <-blocks: + if !ok { + break consumedAllBlocks + } + + batch := []Block{block} + batch = appendBuffered(blocks, batch) + err = lp.processBlocks(ctx, batch) + if err != nil { + return fmt.Errorf("error processing blocks: %w", err) + } + } + } + + return nil +} + +func appendBuffered(ch <-chan Block, blocks []Block) []Block { + for { + select { + case block, ok := <-ch: + if !ok { + return blocks } + + blocks = append(blocks, block) + default: + return blocks } } } +func (lp *LogPoller) processBlocks(ctx context.Context, blocks []Block) error { + // TODO: add logic implemented by NONEVM-916 + return nil +} + +func (lp *LogPoller) run(ctx context.Context) error { + err := lp.loadFilters(ctx) + if err != nil { + return fmt.Errorf("failed loading filters: %w", err) + } + + lastProcessedSlot, err := lp.getLastProcessedSlot(ctx) + if err != nil { + return fmt.Errorf("failed getting last processed slot: %w", err) + } + + filtersToBackfill := lp.filters.GetFiltersToBackfill() + if len(filtersToBackfill) != 0 { + lp.lggr.Debugw("Got new filters to backfill", "filters", filtersToBackfill) + return lp.backfillFilters(ctx, filtersToBackfill, lastProcessedSlot) + } + + addresses, err := lp.filters.GetDistinctAddresses(ctx) + if err != nil { + return fmt.Errorf("failed getting addresses: %w", err) + } + + highestSlot, err := lp.client.GetSlot(ctx, rpc.CommitmentFinalized) + if err != nil { + return fmt.Errorf("failed getting highest slot: %w", err) + } + + if lastProcessedSlot >= int64(highestSlot) { + return fmt.Errorf("last processed slot %d is higher than highest RPC slot %d", lastProcessedSlot, highestSlot) + } + + err = lp.processBlocksRange(ctx, addresses, lastProcessedSlot, int64(highestSlot)) + if err != nil { + return fmt.Errorf("failed processing block range [%d, %d]: %w", lastProcessedSlot+1, highestSlot, err) + } + + lp.lastProcessedSlot = int64(highestSlot) + return nil +} + func (lp *LogPoller) backgroundWorkerRun(ctx context.Context) { pruneFilters := services.NewTicker(time.Minute) defer pruneFilters.Stop() @@ -134,12 +265,3 @@ func (lp *LogPoller) backgroundWorkerRun(ctx context.Context) { } } } - -func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { - // TODO: NONEVM-916 start backfill - lp.lggr.Debugw("Starting filter backfill", "filter", filter) - err := lp.filters.MarkFilterBackfilled(ctx, filter.ID) - if err != nil { - lp.lggr.Errorw("Failed to mark filter backfill", "filter", filter, "err", err) - } -} diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 2239ed608..7fc774681 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -225,3 +225,10 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } + +func (o *DSORM) GetLatestBlock(ctx context.Context) (int64, error) { + q := `SELECT block_number FROM solana.logs WHERE chain_id = $1 ORDER BY block_number DESC LIMIT 1` + var result int64 + err := o.ds.GetContext(ctx, &result, q, o.chainID) + return result, err +} diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 53512d696..e997c31f1 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -3,6 +3,7 @@ package logpoller import ( + "context" "testing" "time" @@ -206,6 +207,33 @@ func TestLogPollerLogs(t *testing.T) { require.Equal(t, log, dbLogs[0]) } +func TestLogPoller_GetLatestBlock(t *testing.T) { + lggr := logger.Test(t) + dbx := pg.NewTestDB(t, pg.TestURL(t)) + + createLogsForBlocks := func(ctx context.Context, orm *DSORM, blocks ...int64) { + filterID, err := orm.InsertFilter(ctx, newRandomFilter(t)) + require.NoError(t, err) + for _, block := range blocks { + log := newRandomLog(t, filterID, orm.chainID) + log.BlockNumber = block + err = orm.InsertLogs(ctx, []Log{log}) + require.NoError(t, err) + } + } + ctx := tests.Context(t) + orm1 := NewORM(uuid.NewString(), dbx, lggr) + createLogsForBlocks(tests.Context(t), orm1, 10, 11, 12) + orm2 := NewORM(uuid.NewString(), dbx, lggr) + createLogsForBlocks(context.Background(), orm2, 100, 110, 120) + latestBlockChain1, err := orm1.GetLatestBlock(ctx) + require.NoError(t, err) + require.Equal(t, int64(12), latestBlockChain1) + latestBlockChain2, err := orm2.GetLatestBlock(ctx) + require.NoError(t, err) + require.Equal(t, int64(120), latestBlockChain2) +} + func newRandomFilter(t *testing.T) Filter { return Filter{ Name: uuid.NewString(), diff --git a/pkg/solana/logpoller/worker.go b/pkg/solana/logpoller/worker.go index 907863a51..58d215fdc 100644 --- a/pkg/solana/logpoller/worker.go +++ b/pkg/solana/logpoller/worker.go @@ -63,7 +63,7 @@ type WorkerGroup struct { // dependencies and configuration maxWorkers int maxRetryCount uint8 - lggr logger.Logger + lggr logger.SugaredLogger // worker group state workers chan *worker @@ -80,7 +80,7 @@ type WorkerGroup struct { retryMap map[string]retryableJob } -func NewWorkerGroup(workers int, lggr logger.Logger) *WorkerGroup { +func NewWorkerGroup(workers int, lggr logger.SugaredLogger) *WorkerGroup { g := &WorkerGroup{ maxWorkers: workers, maxRetryCount: DefaultMaxRetryCount, @@ -202,12 +202,10 @@ func (g *WorkerGroup) runRetryQueue(ctx context.Context) { retry.count++ if retry.count > g.maxRetryCount { - g.lggr.Errorf("job %s dropped after max retries", job) - - continue + g.lggr.Criticalw("job %s exceeded max retries %d/%d", job, retry.count, g.maxRetryCount) } - wait := calculateExponentialBackoff(retry.count) + wait := calculateExponentialBackoff(min(retry.count, g.maxRetryCount)) g.lggr.Errorf("retrying job in %dms", wait/time.Millisecond) retry.when = time.Now().Add(wait)