Skip to content

Commit

Permalink
Merge pull request #450 from ava-labs/move-catchup-to-listener
Browse files Browse the repository at this point in the history
Listener catch-up fix
  • Loading branch information
iansuvak authored Aug 21, 2024
2 parents b0d6eaa + 17c6588 commit d104e65
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
2 changes: 1 addition & 1 deletion relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newListener(
)
return nil, err
}
sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient)
sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient, ethRPCClient)

// Marks when the listener has finished the catch-up process on startup.
// Until that time, we do not know the order in which messages are processed,
Expand Down
45 changes: 38 additions & 7 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ const (
maxClientSubscriptionBuffer = 20000
subscribeRetryTimeout = 1 * time.Second
MaxBlocksPerRequest = 200
rpcMaxRetries = 5
)

// subscriber implements Subscriber
type subscriber struct {
ethClient ethclient.Client
wsClient ethclient.Client
rpcClient ethclient.Client
blockchainID ids.ID
headers chan *types.Header
sub interfaces.Subscription
Expand All @@ -35,10 +37,16 @@ type subscriber struct {
}

// NewSubscriber returns a subscriber
func NewSubscriber(logger logging.Logger, blockchainID ids.ID, ethClient ethclient.Client) *subscriber {
func NewSubscriber(
logger logging.Logger,
blockchainID ids.ID,
wsClient ethclient.Client,
rpcClient ethclient.Client,
) *subscriber {
return &subscriber{
blockchainID: blockchainID,
ethClient: ethClient,
wsClient: wsClient,
rpcClient: rpcClient,
logger: logger,
headers: make(chan *types.Header, maxClientSubscriptionBuffer),
}
Expand All @@ -63,7 +71,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) {
}

// Grab the latest block before filtering logs so we don't miss any before updating the db
latestBlockHeight, err := s.ethClient.BlockNumber(context.Background())
latestBlockHeight, err := s.rpcClient.BlockNumber(context.Background())
if err != nil {
s.logger.Error(
"Failed to get latest block",
Expand Down Expand Up @@ -103,10 +111,10 @@ func (s *subscriber) processBlockRange(
fromBlock, toBlock *big.Int,
) error {
for i := fromBlock.Int64(); i <= toBlock.Int64(); i++ {
header, err := s.ethClient.HeaderByNumber(context.Background(), big.NewInt(i))
header, err := s.getHeaderByNumberRetryable(big.NewInt(i))
if err != nil {
s.logger.Error(
"Failed to get header by number",
"Failed to get header by number after max attempts",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
Expand All @@ -117,6 +125,29 @@ func (s *subscriber) processBlockRange(
return nil
}

func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) {
var err error
var header *types.Header
attempt := 1
for {
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
if err == nil {
return header, nil
}
s.logger.Warn(
"Failed to get header by number",
zap.String("blockchainID", s.blockchainID.String()),
zap.Int("attempt", attempt),
zap.Error(err),
)
if attempt >= rpcMaxRetries {
return nil, err
}
time.Sleep(subscribeRetryTimeout)
attempt++
}
}

// Loops forever iff maxResubscribeAttempts == 0
func (s *subscriber) Subscribe(maxResubscribeAttempts int) error {
// Retry subscribing until successful. Attempt to resubscribe maxResubscribeAttempts times
Expand Down Expand Up @@ -155,7 +186,7 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error {
}

func (s *subscriber) subscribe() error {
sub, err := s.ethClient.SubscribeNewHead(context.Background(), s.headers)
sub, err := s.wsClient.SubscribeNewHead(context.Background(), s.headers)
if err != nil {
s.logger.Error(
"Failed to subscribe to logs",
Expand Down
2 changes: 1 addition & 1 deletion vms/evm/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient
mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t))
blockchainID, err := ids.FromString(sourceSubnet.BlockchainID)
require.NoError(t, err)
subscriber := NewSubscriber(logger, blockchainID, mockEthClient)
subscriber := NewSubscriber(logger, blockchainID, mockEthClient, mockEthClient)

return subscriber, mockEthClient
}
Expand Down
10 changes: 8 additions & 2 deletions vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ type Subscriber interface {
}

// NewSubscriber returns a concrete Subscriber according to the VM specified by [subnetInfo]
func NewSubscriber(logger logging.Logger, vm config.VM, blockchainID ids.ID, ethClient ethclient.Client) Subscriber {
func NewSubscriber(
logger logging.Logger,
vm config.VM,
blockchainID ids.ID,
ethWSClient ethclient.Client,
ethRPCClient ethclient.Client,
) Subscriber {
switch vm {
case config.EVM:
return evm.NewSubscriber(logger, blockchainID, ethClient)
return evm.NewSubscriber(logger, blockchainID, ethWSClient, ethRPCClient)
default:
return nil
}
Expand Down

0 comments on commit d104e65

Please sign in to comment.