Skip to content

Commit

Permalink
paginate eth_getLogs calls
Browse files Browse the repository at this point in the history
  • Loading branch information
feuGeneA committed Nov 15, 2023
1 parent f7f659c commit 4f06a3f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
35 changes: 20 additions & 15 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (s *subscriber) forwardLogs() {
}

// Process logs from the given block height to the latest block
// Cap the number of blocks requested from the client to MaxBlocksToProcess,
// counting back from the current block.
// Paginates ethClient.FilterLogs requests into block ranges spanning
// MaxBlocksToProcess, counting back from the current block.
func (s *subscriber) ProcessFromHeight(height *big.Int) error {
s.logger.Info(
"Processing historical logs",
Expand All @@ -156,18 +156,6 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
return err
}

// Cap the number of blocks to process to MaxBlocksToProcess
toBlock := big.NewInt(0).SetUint64(latestBlock)
if height.Cmp(big.NewInt(0).Sub(toBlock, big.NewInt(MaxBlocksToProcess))) < 0 {
s.logger.Warn(
fmt.Sprintf("Requested to process too many blocks. Processing only the most recent %d blocks", MaxBlocksToProcess),
zap.String("requestedBlockHeight", height.String()),
zap.String("latestBlockHeight", toBlock.String()),
zap.String("chainID", s.chainID.String()),
)
height = big.NewInt(0).Add(toBlock, big.NewInt(-MaxBlocksToProcess))
}

// Filter logs from the latest processed block to the latest block
// Since initializationFilterQuery does not modify existing fields of warpFilterQuery,
// we can safely reuse warpFilterQuery with only a shallow copy
Expand Down Expand Up @@ -218,7 +206,24 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
return nil
}

return processBlockRange(height, toBlock)
bigLatestBlock := big.NewInt(0).SetUint64(latestBlock)

for fromBlock := height; fromBlock.Cmp(bigLatestBlock) <= 0; /*see post statement in body*/ {
toBlock := big.NewInt(0).Add(fromBlock, big.NewInt(MaxBlocksToProcess))
if toBlock.Cmp(bigLatestBlock) > 0 {
toBlock = bigLatestBlock
}

err = processBlockRange(fromBlock, toBlock)
if err != nil {
return err
}

// loop post statement:
fromBlock.Add(toBlock, big.NewInt(1))
}

return nil
}

func (s *subscriber) SetProcessedBlockHeightToLatest() error {
Expand Down
9 changes: 8 additions & 1 deletion vms/evm/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,14 @@ func TestCatchingUpOn300Blocks(t *testing.T) {
mockEthClient.EXPECT().FilterLogs(
gomock.Any(),
makeWarpBlockFilterQuery(
big.NewInt(800),
big.NewInt(700),
big.NewInt(900),
),
).Return([]types.Log{}, nil).Times(1)
mockEthClient.EXPECT().FilterLogs(
gomock.Any(),
makeWarpBlockFilterQuery(
big.NewInt(901),
big.NewInt(0).SetUint64(latestBlock),
),
).Return([]types.Log{}, nil).Times(1)
Expand Down

0 comments on commit 4f06a3f

Please sign in to comment.