Skip to content

Commit

Permalink
fixes to run perf test
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 12, 2025
1 parent a1e1d42 commit dd8ff03
Show file tree
Hide file tree
Showing 6 changed files with 765 additions and 744 deletions.
6 changes: 5 additions & 1 deletion pkg/solana/logpoller/blocks_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type blocksSorter struct {
readyBlocks map[uint64]Block
}

func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger) (*blocksSorter, <-chan Block) {
func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger, expectedBlocks []uint64) (*blocksSorter, <-chan Block) {
op := &blocksSorter{
queue: list.New(),
readyBlocks: make(map[uint64]Block),
Expand All @@ -37,6 +37,10 @@ func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger) (*blocksSorter,
lggr: lggr,
}

for _, b := range expectedBlocks {
op.queue.PushBack(b)
}

op.Service, op.engine = services.Config{
Name: "blocksSorter",
Start: op.start,
Expand Down
4 changes: 2 additions & 2 deletions pkg/solana/logpoller/job_get_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (j *getBlockJob) Run(ctx context.Context) error {
events := make([]ProgramEvent, 0, len(block.Transactions))
for idx, txWithMeta := range block.Transactions {
detail.trxIdx = idx
tx, err := txWithMeta.GetParsedTransaction()
tx, err := txWithMeta.GetTransaction()
if err != nil {
return fmt.Errorf("parsing transaction %d in slot %d: %w", idx, txWithMeta.Slot, err)
return fmt.Errorf("failed to parse transaction %d in slot %d: %w", idx, txWithMeta.Slot, err)
}
if len(tx.Signatures) == 0 {
return fmt.Errorf("expected all transactions to have at least one signature %d in slot %d", idx, txWithMeta.Slot)
Expand Down
4 changes: 4 additions & 0 deletions pkg/solana/logpoller/job_get_slots_for_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newGetSlotsForAddress(client RPCClient, workers *WorkerGroup, storeSlot fun
to: to,
storeSlot: storeSlot,
workers: workers,
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -84,6 +85,9 @@ func (f *getSlotsForAddressJob) run(ctx context.Context) (bool, error) {
continue
}

// TODO: ignore slots that are higher than to

lowestSlot = sig.Slot
f.storeSlot(sig.Slot)
if sig.Slot <= f.from {
return true, nil
Expand Down
212 changes: 103 additions & 109 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package logpoller

import (
"context"
"errors"
"fmt"
"slices"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,8 +48,6 @@ type EncodedLogCollector struct {

// dependencies and configuration
client RPCClient
ordered *blocksSorter
unordered *unorderedParser
lggr logger.Logger
rpcTimeLimit time.Duration

Expand All @@ -66,12 +62,10 @@ type EncodedLogCollector struct {

func NewEncodedLogCollector(
client RPCClient,
parser ProgramEventProcessor,
lggr logger.Logger,
) *EncodedLogCollector {
c := &EncodedLogCollector{
client: client,
unordered: newUnorderedParser(parser),
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
Expand All @@ -83,9 +77,8 @@ func NewEncodedLogCollector(
Name: "EncodedLogCollector",
NewSubServices: func(lggr logger.Logger) []services.Service {
c.workers = NewWorkerGroup(DefaultWorkerCount, lggr)
c.ordered = newBlocksSorter(lggr)

return []services.Service{c.workers, c.ordered}
return []services.Service{c.workers}
},
Start: c.start,
Close: c.close,
Expand Down Expand Up @@ -156,36 +149,37 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots
return blocks, nil
}

func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresses []PublicKey, fromSlot, toSlot uint64) (<-chan Block, error) {
func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresses []PublicKey, fromSlot, toSlot uint64) (orderedBlocks <-chan Block, cleanUp func(), err error) {
slotsToFetch, err := c.getSlotsToFetch(ctx, addresses, fromSlot, toSlot)
if err != nil {
return nil, fmt.Errorf("failed to identify slots to fetch: %w", err)
return nil, func() {}, fmt.Errorf("failed to identify slots to fetch: %w", err)
}

c.lggr.Debugw("Got all slots that need fetching for backfill operations", "addresses", PublicKeysToString(addresses), "fromSlot", fromSlot, "toSlot", toSlot, "slotsToFetch", slotsToFetch)

unorderedBlocks, err := c.scheduleBlocksFetching(ctx, slotsToFetch)
if err != nil {
return nil, fmt.Errorf("failed to schedule blocks to fetch: %w", err)
return nil, func() {}, fmt.Errorf("failed to schedule blocks to fetch: %w", err)
}
blocksSorter, sortedBlocks := newBlocksSorter(unorderedBlocks, c.lggr)
blocksSorter, sortedBlocks := newBlocksSorter(unorderedBlocks, c.lggr, slotsToFetch)
err = blocksSorter.Start(ctx)
if err != nil {
return nil, fmt.Errorf("failed to start blocks sorter: %w", err)
return nil, func() {}, fmt.Errorf("failed to start blocks sorter: %w", err)
}
defer func() {
err = blocksSorter.Close()

cleanUp = func() {
err := blocksSorter.Close()
if err != nil {
c.lggr.Error(fmt.Errorf("failed to close blocks sorter: %w", err))
blocksSorter.lggr.Errorw("Failed to close blocks sorter", "err", err)
}
}()
}

return sortedBlocks, nil
return sortedBlocks, cleanUp, nil
}

func (c *EncodedLogCollector) start(_ context.Context) error {
c.engine.Go(c.runSlotPolling)
c.engine.Go(c.runSlotProcessing)
c.engine.Go(c.runBlockProcessing)
c.engine.Go(c.runJobProcessing)
//c.engine.Go(c.runSlotPolling)
//c.engine.Go(c.runJobProcessing)

return nil
}
Expand Down Expand Up @@ -236,50 +230,50 @@ func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) {
}
}

func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
start := uint64(0)
for {
select {
case <-ctx.Done():
return
case end := <-c.chSlot:
if start >= end {
continue
}

if start == 0 {
start = end // TODO: should be loaded from db or passed into EncodedLogCollector as arg
}
// load blocks in slot range
if err := c.loadSlotBlocksRange(ctx, start, end); err != nil {
// a retry will happen anyway on the next round of slots
// so the error is handled by doing nothing
c.lggr.Errorw("failed to load slot blocks range", "start", start, "end", end, "err", err)
continue
}

start = end + 1
}
}
}

func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case slot := <-c.chBlock:
if err := c.workers.Do(ctx, &getBlockJob{
slotNumber: slot,
client: c.client,
parser: c.ordered,
chJobs: c.chJobs,
}); err != nil {
c.lggr.Errorf("failed to add job to queue: %s", err)
}
}
}
}
//func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
// start := uint64(0)
// for {
// select {
// case <-ctx.Done():
// return
// case end := <-c.chSlot:
// if start >= end {
// continue
// }
//
// if start == 0 {
// start = end // TODO: should be loaded from db or passed into EncodedLogCollector as arg
// }
// // load blocks in slot range
// if err := c.loadSlotBlocksRange(ctx, start, end); err != nil {
// // a retry will happen anyway on the next round of slots
// // so the error is handled by doing nothing
// c.lggr.Errorw("failed to load slot blocks range", "start", start, "end", end, "err", err)
// continue
// }
//
// start = end + 1
// }
// }
//}

//func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
// for {
// select {
// case <-ctx.Done():
// return
// case slot := <-c.chBlock:
// if err := c.workers.Do(ctx, &getBlockJob{
// slotNumber: slot,
// client: c.client,
// parser: c.ordered,
// chJobs: c.chJobs,
// }); err != nil {
// c.lggr.Errorf("failed to add job to queue: %s", err)
// }
// }
// }
//}

func (c *EncodedLogCollector) runJobProcessing(ctx context.Context) {
for {
Expand All @@ -294,49 +288,49 @@ func (c *EncodedLogCollector) runJobProcessing(ctx context.Context) {
}
}

func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, end uint64) error {
if start >= end {
return errors.New("the start block must come before the end block")
}

var (
result rpc.BlocksResult
err error
)

rpcCtx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit)
defer cancel()

if result, err = c.client.GetBlocks(rpcCtx, start, &end, rpc.CommitmentFinalized); err != nil {
return err
}

c.lggr.Debugw("loaded blocks for slots range", "start", start, "end", end, "blocks", len(result))

// as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case
// that the RPC changes and results get jumbled.
slices.SortFunc(result, func(a, b uint64) int {
if a < b {
return -1
} else if a > b {
return 1
}

return 0
})

for _, block := range result {
c.ordered.ExpectBlock(block)

select {
case <-ctx.Done():
return nil
case c.chBlock <- block:
}
}

return nil
}
//func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, end uint64) error {
// if start >= end {
// return errors.New("the start block must come before the end block")
// }
//
// var (
// result rpc.BlocksResult
// err error
// )
//
// rpcCtx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit)
// defer cancel()
//
// if result, err = c.client.GetBlocks(rpcCtx, start, &end, rpc.CommitmentFinalized); err != nil {
// return err
// }
//
// c.lggr.Debugw("loaded blocks for slots range", "start", start, "end", end, "blocks", len(result))
//
// // as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case
// // that the RPC changes and results get jumbled.
// slices.SortFunc(result, func(a, b uint64) int {
// if a < b {
// return -1
// } else if a > b {
// return 1
// }
//
// return 0
// })
//
// for _, block := range result {
// c.ordered.ExpectBlock(block)
//
// select {
// case <-ctx.Done():
// return nil
// case c.chBlock <- block:
// }
// }
//
// return nil
//}

type unorderedParser struct {
parser ProgramEventProcessor
Expand Down
Loading

0 comments on commit dd8ff03

Please sign in to comment.