Skip to content

Commit

Permalink
simplify blocks sorter
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 13, 2025
1 parent 6b785cf commit 8f5c0af
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions pkg/solana/logpoller/blocks_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"container/list"
"context"
"sync"
"sync/atomic"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -16,9 +15,8 @@ type blocksSorter struct {
engine *services.Engine
lggr logger.Logger

inBlocks <-chan Block
wontReceiveNewBlocks atomic.Bool
receivedNewBlock chan struct{}
inBlocks <-chan Block
receivedNewBlock chan struct{}

outBlocks chan Block

Expand Down Expand Up @@ -50,16 +48,6 @@ func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger, expectedBlocks [
return op, op.outBlocks
}

// ExpectBlocks should be called in block order to preserve block progression.
func (p *blocksSorter) ExpectBlocks(blocks ...uint64) {
p.mu.Lock()
defer p.mu.Unlock()

for _, b := range blocks {
p.queue.PushBack(b)
}
}

func (p *blocksSorter) start(_ context.Context) error {
p.engine.Go(p.writeOrderedBlocks)
p.engine.Go(p.readBlocks)
Expand All @@ -73,7 +61,6 @@ func (p *blocksSorter) readBlocks(ctx context.Context) {
return
case block, ok := <-p.inBlocks:
if !ok {
p.wontReceiveNewBlocks.Store(true)
close(p.receivedNewBlock) // trigger last flush of ready blocks
return
}
Expand All @@ -95,8 +82,9 @@ func (p *blocksSorter) writeOrderedBlocks(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-p.receivedNewBlock:
if p.sendReadyBlocks(ctx) {
case _, ok := <-p.receivedNewBlock:
p.writeReadyBlocks(ctx, ok)
if !ok {
return
}
}
Expand All @@ -121,10 +109,8 @@ func (p *blocksSorter) readNextReadyBlock() *Block {
return &block
}

// sendReadyBlocks - sends all blocks in order defined by queue to the consumer.
// Returns true, when blocks provider (inBlocks) signaled that we won't receive any new blocks
// and we've sent all that were ready.
func (p *blocksSorter) sendReadyBlocks(ctx context.Context) bool {
// writeReadyBlocks - sends all blocks in order defined by queue to the consumer.
func (p *blocksSorter) writeReadyBlocks(ctx context.Context, mayHaveMoreWork bool) {
// start at the lowest block and find ready blocks
for {
block := p.readNextReadyBlock()
Expand All @@ -139,11 +125,14 @@ func (p *blocksSorter) sendReadyBlocks(ctx context.Context) bool {
}
}

if p.wontReceiveNewBlocks.Load() {
if mayHaveMoreWork {
return
}

p.mu.Lock()
defer p.mu.Unlock()
if p.queue.Len() == 0 {
// signal to consumer that work is done
close(p.outBlocks)
return true
}

return false
}

0 comments on commit 8f5c0af

Please sign in to comment.