diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index ef19c616..8c309b40 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -61,6 +61,7 @@ type subscriber struct { logger logging.Logger db database.RelayerDatabase + dial func (url string) (ethclient.Client, error) } // NewSubscriber returns a subscriber @@ -83,6 +84,7 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat logger: logger, db: db, logsChan: logs, + dial: ethclient.Dial, } } @@ -138,7 +140,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error { if height == nil { return fmt.Errorf("cannot process logs from nil height") } - ethClient, err := ethclient.Dial(s.nodeRPCURL) + ethClient, err := s.dial(s.nodeWSURL) if err != nil { return err } @@ -221,7 +223,7 @@ func (s *subscriber) SetProcessedBlockHeightToLatest() error { "Updating latest processed block in database", zap.String("chainID", s.chainID.String()), ) - ethClient, err := ethclient.Dial(s.nodeRPCURL) + ethClient, err := s.dial(s.nodeWSURL) if err != nil { s.logger.Error( "Failed to dial node", @@ -288,7 +290,7 @@ func (s *subscriber) Subscribe() error { func (s *subscriber) dialAndSubscribe() error { // Dial the configured source chain endpoint // This needs to be a websocket - ethClient, err := ethclient.Dial(s.nodeWSURL) + ethClient, err := s.dial(s.nodeWSURL) if err != nil { return err } diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go new file mode 100644 index 00000000..cfcddd1c --- /dev/null +++ b/vms/evm/subscriber_test.go @@ -0,0 +1,111 @@ +package evm + +import ( + "math/big" + "os" + "testing" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/database" + mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks" + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/interfaces" + "github.com/ava-labs/subnet-evm/x/warp" + "github.com/ethereum/go-ethereum/common" + "go.uber.org/mock/gomock" +) + +func makeSubscriberWithMockEthClient(t *testing.T) (subscriber, *mock_ethclient.MockClient) { + sourceSubnet := config.SourceSubnet{ + SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", + ChainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", + VM: config.EVM.String(), + APINodeHost: "127.0.0.1", + APINodePort: 9650, + EncryptConnection: false, + RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc", + } + + logger := logging.NewLogger( + "awm-relayer-test", + logging.NewWrappedCore( + logging.Info, + os.Stdout, + logging.JSON.ConsoleEncoder(), + ), + ) + + subnetId, err := ids.FromString(sourceSubnet.ChainID) + if err != nil { + t.Fatalf("Failed to create subnet ID") + } + + db, err := database.NewJSONFileStorage(logger, t.TempDir(), []ids.ID{subnetId}) + if err != nil { + t.Fatalf("Failed to create JSON file storage") + } + + mockController := gomock.NewController(t) + mockEthClient := mock_ethclient.NewMockClient(mockController) + stockSubscriber := NewSubscriber(logger, sourceSubnet, db) + subscriberUnderTest := subscriber{ + nodeWSURL: stockSubscriber.nodeWSURL, + nodeRPCURL: stockSubscriber.nodeRPCURL, + chainID: stockSubscriber.chainID, + logsChan: stockSubscriber.logsChan, + evmLog: stockSubscriber.evmLog, + logger: stockSubscriber.logger, + db: stockSubscriber.db, + dial: func (_url string) (ethclient.Client, error) { return mockEthClient, nil }, + } + + return subscriberUnderTest, mockEthClient +} + +func makeWarpBlockFilterQuery(fromBlock *big.Int, toBlock *big.Int) interfaces.FilterQuery { + return interfaces.FilterQuery{ + Topics: [][]common.Hash{ + {warp.WarpABI.Events["SendWarpMessage"].ID}, + {}, + {}, + }, + Addresses: []common.Address{ + warp.ContractAddress, + }, + FromBlock: fromBlock, + ToBlock: toBlock, + } +} + +func TestCatchingUpOn200Blocks(t *testing.T) { + subscriberUnderTest, mockEthClient := makeSubscriberWithMockEthClient(t) + + latestBlock := uint64(1000) + height := big.NewInt(800) + filterQuery := makeWarpBlockFilterQuery(height, big.NewInt(0).SetUint64(latestBlock)) + + mockEthClient.EXPECT().BlockNumber(gomock.Any()).Return(latestBlock, nil).Times(1) + mockEthClient.EXPECT().FilterLogs(gomock.Any(), filterQuery).Return([]types.Log{}, nil).Times(1) + + subscriberUnderTest.ProcessFromHeight(height) +} + +func TestCatchingUpOn300Blocks(t *testing.T) { + subscriberUnderTest, mockEthClient := makeSubscriberWithMockEthClient(t) + + latestBlock := uint64(1000) + // ask the subscriber to catch up on 300 blocks but observe that it + // only catches up on the most recent 200 blocks, since that's the max + // it will do. + heightArgument := big.NewInt(700) + fromBlockFilterValue := big.NewInt(800) + filterQuery := makeWarpBlockFilterQuery(fromBlockFilterValue, big.NewInt(0).SetUint64(latestBlock)) + + mockEthClient.EXPECT().BlockNumber(gomock.Any()).Return(latestBlock, nil).Times(1) + mockEthClient.EXPECT().FilterLogs(gomock.Any(), filterQuery).Return([]types.Log{}, nil).Times(1) + + subscriberUnderTest.ProcessFromHeight(heightArgument) +}