From 4f06a3f0935598ca156c288f37a562b7a66961bf Mon Sep 17 00:00:00 2001 From: "F. Eugene Aumson" Date: Wed, 15 Nov 2023 03:03:34 +0000 Subject: [PATCH] paginate eth_getLogs calls --- vms/evm/subscriber.go | 35 ++++++++++++++++++++--------------- vms/evm/subscriber_test.go | 9 ++++++++- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 9a6266ce..854ca947 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -129,8 +129,8 @@ func (s *subscriber) forwardLogs() { } // Process logs from the given block height to the latest block -// Cap the number of blocks requested from the client to MaxBlocksToProcess, -// counting back from the current block. +// Paginates ethClient.FilterLogs requests into block ranges spanning +// MaxBlocksToProcess, counting back from the current block. func (s *subscriber) ProcessFromHeight(height *big.Int) error { s.logger.Info( "Processing historical logs", @@ -156,18 +156,6 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error { return err } - // Cap the number of blocks to process to MaxBlocksToProcess - toBlock := big.NewInt(0).SetUint64(latestBlock) - if height.Cmp(big.NewInt(0).Sub(toBlock, big.NewInt(MaxBlocksToProcess))) < 0 { - s.logger.Warn( - fmt.Sprintf("Requested to process too many blocks. Processing only the most recent %d blocks", MaxBlocksToProcess), - zap.String("requestedBlockHeight", height.String()), - zap.String("latestBlockHeight", toBlock.String()), - zap.String("chainID", s.chainID.String()), - ) - height = big.NewInt(0).Add(toBlock, big.NewInt(-MaxBlocksToProcess)) - } - // Filter logs from the latest processed block to the latest block // Since initializationFilterQuery does not modify existing fields of warpFilterQuery, // we can safely reuse warpFilterQuery with only a shallow copy @@ -218,7 +206,24 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error { return nil } - return processBlockRange(height, toBlock) + bigLatestBlock := big.NewInt(0).SetUint64(latestBlock) + + for fromBlock := height; fromBlock.Cmp(bigLatestBlock) <= 0; /*see post statement in body*/ { + toBlock := big.NewInt(0).Add(fromBlock, big.NewInt(MaxBlocksToProcess)) + if toBlock.Cmp(bigLatestBlock) > 0 { + toBlock = bigLatestBlock + } + + err = processBlockRange(fromBlock, toBlock) + if err != nil { + return err + } + + // loop post statement: + fromBlock.Add(toBlock, big.NewInt(1)) + } + + return nil } func (s *subscriber) SetProcessedBlockHeightToLatest() error { diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go index 4236c284..009d9276 100644 --- a/vms/evm/subscriber_test.go +++ b/vms/evm/subscriber_test.go @@ -111,7 +111,14 @@ func TestCatchingUpOn300Blocks(t *testing.T) { mockEthClient.EXPECT().FilterLogs( gomock.Any(), makeWarpBlockFilterQuery( - big.NewInt(800), + big.NewInt(700), + big.NewInt(900), + ), + ).Return([]types.Log{}, nil).Times(1) + mockEthClient.EXPECT().FilterLogs( + gomock.Any(), + makeWarpBlockFilterQuery( + big.NewInt(901), big.NewInt(0).SetUint64(latestBlock), ), ).Return([]types.Log{}, nil).Times(1)