diff --git a/relayer/listener.go b/relayer/listener.go index 6df5dc05..379af75f 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -106,7 +106,7 @@ func newListener( ) return nil, err } - sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient) + sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient, ethRPCClient) // Marks when the listener has finished the catch-up process on startup. // Until that time, we do not know the order in which messages are processed, diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 8027baeb..672e9290 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -22,11 +22,13 @@ const ( maxClientSubscriptionBuffer = 20000 subscribeRetryTimeout = 1 * time.Second MaxBlocksPerRequest = 200 + rpcMaxRetries = 5 ) // subscriber implements Subscriber type subscriber struct { - ethClient ethclient.Client + wsClient ethclient.Client + rpcClient ethclient.Client blockchainID ids.ID headers chan *types.Header sub interfaces.Subscription @@ -35,10 +37,16 @@ type subscriber struct { } // NewSubscriber returns a subscriber -func NewSubscriber(logger logging.Logger, blockchainID ids.ID, ethClient ethclient.Client) *subscriber { +func NewSubscriber( + logger logging.Logger, + blockchainID ids.ID, + wsClient ethclient.Client, + rpcClient ethclient.Client, +) *subscriber { return &subscriber{ blockchainID: blockchainID, - ethClient: ethClient, + wsClient: wsClient, + rpcClient: rpcClient, logger: logger, headers: make(chan *types.Header, maxClientSubscriptionBuffer), } @@ -63,7 +71,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) { } // Grab the latest block before filtering logs so we don't miss any before updating the db - latestBlockHeight, err := s.ethClient.BlockNumber(context.Background()) + latestBlockHeight, err := s.rpcClient.BlockNumber(context.Background()) if err != nil { s.logger.Error( "Failed to get latest block", @@ -103,10 +111,10 @@ func (s *subscriber) processBlockRange( fromBlock, toBlock *big.Int, ) error { for i := fromBlock.Int64(); i <= toBlock.Int64(); i++ { - header, err := s.ethClient.HeaderByNumber(context.Background(), big.NewInt(i)) + header, err := s.getHeaderByNumberRetryable(big.NewInt(i)) if err != nil { s.logger.Error( - "Failed to get header by number", + "Failed to get header by number after max attempts", zap.String("blockchainID", s.blockchainID.String()), zap.Error(err), ) @@ -117,6 +125,29 @@ func (s *subscriber) processBlockRange( return nil } +func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) { + var err error + var header *types.Header + attempt := 1 + for { + header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber) + if err == nil { + return header, nil + } + s.logger.Warn( + "Failed to get header by number", + zap.String("blockchainID", s.blockchainID.String()), + zap.Int("attempt", attempt), + zap.Error(err), + ) + if attempt >= rpcMaxRetries { + return nil, err + } + time.Sleep(subscribeRetryTimeout) + attempt++ + } +} + // Loops forever iff maxResubscribeAttempts == 0 func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { // Retry subscribing until successful. Attempt to resubscribe maxResubscribeAttempts times @@ -155,7 +186,7 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { } func (s *subscriber) subscribe() error { - sub, err := s.ethClient.SubscribeNewHead(context.Background(), s.headers) + sub, err := s.wsClient.SubscribeNewHead(context.Background(), s.headers) if err != nil { s.logger.Error( "Failed to subscribe to logs", diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go index f9661257..7a72b776 100644 --- a/vms/evm/subscriber_test.go +++ b/vms/evm/subscriber_test.go @@ -32,7 +32,7 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t)) blockchainID, err := ids.FromString(sourceSubnet.BlockchainID) require.NoError(t, err) - subscriber := NewSubscriber(logger, blockchainID, mockEthClient) + subscriber := NewSubscriber(logger, blockchainID, mockEthClient, mockEthClient) return subscriber, mockEthClient } diff --git a/vms/subscriber.go b/vms/subscriber.go index 860caf08..91f90e60 100644 --- a/vms/subscriber.go +++ b/vms/subscriber.go @@ -39,10 +39,16 @@ type Subscriber interface { } // NewSubscriber returns a concrete Subscriber according to the VM specified by [subnetInfo] -func NewSubscriber(logger logging.Logger, vm config.VM, blockchainID ids.ID, ethClient ethclient.Client) Subscriber { +func NewSubscriber( + logger logging.Logger, + vm config.VM, + blockchainID ids.ID, + ethWSClient ethclient.Client, + ethRPCClient ethclient.Client, +) Subscriber { switch vm { case config.EVM: - return evm.NewSubscriber(logger, blockchainID, ethClient) + return evm.NewSubscriber(logger, blockchainID, ethWSClient, ethRPCClient) default: return nil }