Skip to content

Commit

Permalink
test subscriber.ProcessFromHeight filter params
Browse files Browse the repository at this point in the history
  • Loading branch information
feuGeneA committed Nov 15, 2023
1 parent e0bab44 commit ec95b57
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 6 deletions.
44 changes: 41 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,53 @@
### Build Stage ###
FROM golang:1.20.8-bullseye as build
FROM golang:1.20.10-bullseye as build_env

WORKDIR /go/src
# Copy the code into the container
COPY go.* .
RUN go mod download

FROM build_env AS build
COPY . .
RUN go mod tidy
# Build awm-relayer
RUN bash ./scripts/build.sh
RUN --mount=type=cache,target=/root/.cache/go-build \
bash ./scripts/build.sh

FROM build_env AS lint_env
COPY . .
RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.55.2

FROM build_env AS e2e_test_env
ENV BASEDIR=/tmp/e2e-test
ENV AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego
COPY ./scripts/versions.sh ./scripts/versions.sh
SHELL ["/bin/bash", "-c"]
RUN source scripts/versions.sh && \
go install github.com/onsi/ginkgo/v2/ginkgo@$GINKGO_VERSION && \
git clone --depth 1 --branch v0.5.8 https://github.com/ava-labs/subnet-evm.git && \
cd subnet-evm && \
mkdir -p /tmp/e2e-test && \
./scripts/install_avalanchego_release.sh && \
./scripts/build.sh \
/tmp/e2e-test/avalanchego/plugins/srEXiWaHuhNyGwPUi444Tu47ZEDwxTWrbQiuD7FmgSAQ6X7Dy

FROM build_env AS test
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
go test ./...

FROM lint_env AS lint
COPY . .
RUN golangci-lint run --path-prefix=. --timeout 3m

FROM e2e_test_env AS test_e2e
COPY . .
ENV DATA_DIR=/tmp/e2e-test/data
RUN mkdir -p $DATA_DIR && ls $DATA_DIR
RUN ./scripts/e2e_test.sh

### RUN Stage ###
FROM golang:1.20.8
FROM golang:1.20.10
COPY --from=build /go/src/build/awm-relayer /usr/bin/awm-relayer
EXPOSE 8080
USER 1001
Expand Down
8 changes: 5 additions & 3 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type subscriber struct {

logger logging.Logger
db database.RelayerDatabase
dial func(url string) (ethclient.Client, error)
}

// NewSubscriber returns a subscriber
Expand All @@ -83,6 +84,7 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat
logger: logger,
db: db,
logsChan: logs,
dial: ethclient.Dial,
}
}

Expand Down Expand Up @@ -138,7 +140,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
if height == nil {
return fmt.Errorf("cannot process logs from nil height")
}
ethClient, err := ethclient.Dial(s.nodeRPCURL)
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +223,7 @@ func (s *subscriber) SetProcessedBlockHeightToLatest() error {
"Updating latest processed block in database",
zap.String("chainID", s.chainID.String()),
)
ethClient, err := ethclient.Dial(s.nodeRPCURL)
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
s.logger.Error(
"Failed to dial node",
Expand Down Expand Up @@ -288,7 +290,7 @@ func (s *subscriber) Subscribe() error {
func (s *subscriber) dialAndSubscribe() error {
// Dial the configured source chain endpoint
// This needs to be a websocket
ethClient, err := ethclient.Dial(s.nodeWSURL)
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
return err
}
Expand Down
111 changes: 111 additions & 0 deletions vms/evm/subscriber_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package evm

import (
"math/big"
"os"
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ava-labs/subnet-evm/x/warp"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/mock/gomock"
)

func makeSubscriberWithMockEthClient(t *testing.T) (subscriber, *mock_ethclient.MockClient) {
sourceSubnet := config.SourceSubnet{
SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx",
ChainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD",
VM: config.EVM.String(),
APINodeHost: "127.0.0.1",
APINodePort: 9650,
EncryptConnection: false,
RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc",
}

logger := logging.NewLogger(
"awm-relayer-test",
logging.NewWrappedCore(
logging.Info,
os.Stdout,
logging.JSON.ConsoleEncoder(),
),
)

subnetId, err := ids.FromString(sourceSubnet.ChainID)
if err != nil {
t.Fatalf("Failed to create subnet ID")
}

db, err := database.NewJSONFileStorage(logger, t.TempDir(), []ids.ID{subnetId})
if err != nil {
t.Fatalf("Failed to create JSON file storage")
}

mockController := gomock.NewController(t)
mockEthClient := mock_ethclient.NewMockClient(mockController)
stockSubscriber := NewSubscriber(logger, sourceSubnet, db)
subscriberUnderTest := subscriber{
nodeWSURL: stockSubscriber.nodeWSURL,
nodeRPCURL: stockSubscriber.nodeRPCURL,
chainID: stockSubscriber.chainID,
logsChan: stockSubscriber.logsChan,
evmLog: stockSubscriber.evmLog,
logger: stockSubscriber.logger,
db: stockSubscriber.db,
dial: func(_url string) (ethclient.Client, error) { return mockEthClient, nil },
}

return subscriberUnderTest, mockEthClient
}

func makeWarpBlockFilterQuery(fromBlock *big.Int, toBlock *big.Int) interfaces.FilterQuery {
return interfaces.FilterQuery{
Topics: [][]common.Hash{
{warp.WarpABI.Events["SendWarpMessage"].ID},
{},
{},
},
Addresses: []common.Address{
warp.ContractAddress,
},
FromBlock: fromBlock,
ToBlock: toBlock,
}
}

func TestCatchingUpOn200Blocks(t *testing.T) {
subscriberUnderTest, mockEthClient := makeSubscriberWithMockEthClient(t)

latestBlock := uint64(1000)
height := big.NewInt(800)
filterQuery := makeWarpBlockFilterQuery(height, big.NewInt(0).SetUint64(latestBlock))

mockEthClient.EXPECT().BlockNumber(gomock.Any()).Return(latestBlock, nil).Times(1)
mockEthClient.EXPECT().FilterLogs(gomock.Any(), filterQuery).Return([]types.Log{}, nil).Times(1)

subscriberUnderTest.ProcessFromHeight(height)
}

func TestCatchingUpOn300Blocks(t *testing.T) {
subscriberUnderTest, mockEthClient := makeSubscriberWithMockEthClient(t)

latestBlock := uint64(1000)
// ask the subscriber to catch up on 300 blocks but observe that it
// only catches up on the most recent 200 blocks, since that's the max
// it will do.
heightArgument := big.NewInt(700)
fromBlockFilterValue := big.NewInt(800)
filterQuery := makeWarpBlockFilterQuery(fromBlockFilterValue, big.NewInt(0).SetUint64(latestBlock))

mockEthClient.EXPECT().BlockNumber(gomock.Any()).Return(latestBlock, nil).Times(1)
mockEthClient.EXPECT().FilterLogs(gomock.Any(), filterQuery).Return([]types.Log{}, nil).Times(1)

subscriberUnderTest.ProcessFromHeight(heightArgument)
}

0 comments on commit ec95b57

Please sign in to comment.