Skip to content

Commit

Permalink
test subscriber.ProcessFromHeight filter params
Browse files Browse the repository at this point in the history
  • Loading branch information
feuGeneA committed Nov 15, 2023
1 parent e0bab44 commit 1626219
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 3 deletions.
8 changes: 5 additions & 3 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type subscriber struct {

logger logging.Logger
db database.RelayerDatabase
dial func(url string) (ethclient.Client, error)
}

// NewSubscriber returns a subscriber
Expand All @@ -83,6 +84,7 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat
logger: logger,
db: db,
logsChan: logs,
dial: ethclient.Dial,
}
}

Expand Down Expand Up @@ -138,7 +140,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
if height == nil {
return fmt.Errorf("cannot process logs from nil height")
}
ethClient, err := ethclient.Dial(s.nodeRPCURL)
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +223,7 @@ func (s *subscriber) SetProcessedBlockHeightToLatest() error {
"Updating latest processed block in database",
zap.String("chainID", s.chainID.String()),
)
ethClient, err := ethclient.Dial(s.nodeRPCURL)
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
s.logger.Error(
"Failed to dial node",
Expand Down Expand Up @@ -288,7 +290,7 @@ func (s *subscriber) Subscribe() error {
func (s *subscriber) dialAndSubscribe() error {
// Dial the configured source chain endpoint
// This needs to be a websocket
ethClient, err := ethclient.Dial(s.nodeWSURL)
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
return err
}
Expand Down
115 changes: 115 additions & 0 deletions vms/evm/subscriber_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package evm

import (
"math/big"
"os"
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ava-labs/subnet-evm/x/warp"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/mock/gomock"
)

func makeSubscriberWithMockEthClient(t *testing.T) (subscriber, *mock_ethclient.MockClient) {
sourceSubnet := config.SourceSubnet{
SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx",
ChainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD",
VM: config.EVM.String(),
APINodeHost: "127.0.0.1",
APINodePort: 9650,
EncryptConnection: false,
RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc",
}

logger := logging.NewLogger(
"awm-relayer-test",
logging.NewWrappedCore(
logging.Info,
os.Stdout,
logging.JSON.ConsoleEncoder(),
),
)

subnetId, err := ids.FromString(sourceSubnet.ChainID)
if err != nil {
t.Fatalf("Failed to create subnet ID")
}

db, err := database.NewJSONFileStorage(logger, t.TempDir(), []ids.ID{subnetId})
if err != nil {
t.Fatalf("Failed to create JSON file storage")
}

mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t))
stockSubscriber := NewSubscriber(logger, sourceSubnet, db)
subscriberUnderTest := subscriber{
nodeWSURL: stockSubscriber.nodeWSURL,
nodeRPCURL: stockSubscriber.nodeRPCURL,
chainID: stockSubscriber.chainID,
logsChan: stockSubscriber.logsChan,
evmLog: stockSubscriber.evmLog,
logger: stockSubscriber.logger,
db: stockSubscriber.db,
dial: func(_url string) (ethclient.Client, error) { return mockEthClient, nil },
}

return subscriberUnderTest, mockEthClient
}

func expectProcessFromHeightFilterLogs(
mock *mock_ethclient.MockClient,
fromBlock int64,
toBlock int64,
) {
mock.EXPECT().FilterLogs(
gomock.Any(),
interfaces.FilterQuery{
Topics: [][]common.Hash{
{warp.WarpABI.Events["SendWarpMessage"].ID},
{},
{},
},
Addresses: []common.Address{
warp.ContractAddress,
},
FromBlock: big.NewInt(fromBlock),
ToBlock: big.NewInt(toBlock),
},
).Return([]types.Log{}, nil).Times(1)
}

func TestCatchingUpOn200Blocks(t *testing.T) {
subscriberUnderTest, mockEthClient := makeSubscriberWithMockEthClient(t)

latestBlock := int64(1000)
heightToProcessFrom := int64(800)

mockEthClient.EXPECT().BlockNumber(gomock.Any()).Return(uint64(latestBlock), nil).Times(1)
expectProcessFromHeightFilterLogs(mockEthClient, heightToProcessFrom, latestBlock)

subscriberUnderTest.ProcessFromHeight(big.NewInt(heightToProcessFrom))
}

func TestCatchingUpOn300Blocks(t *testing.T) {
subscriberUnderTest, mockEthClient := makeSubscriberWithMockEthClient(t)

// ask the subscriber to catch up on 300 blocks but observe that it
// only catches up on the most recent 200 blocks, since that's the max
// it will do.

latestBlock := int64(1000)
heightToProcessFrom := int64(700)

mockEthClient.EXPECT().BlockNumber(gomock.Any()).Return(uint64(latestBlock), nil).Times(1)
expectProcessFromHeightFilterLogs(mockEthClient, int64(800), latestBlock)

subscriberUnderTest.ProcessFromHeight(big.NewInt(heightToProcessFrom))
}

0 comments on commit 1626219

Please sign in to comment.