Skip to content

Commit

Permalink
write latest block to db on initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Sep 5, 2023
1 parent 532094c commit f4d2c83
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 43 deletions.
43 changes: 28 additions & 15 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,42 @@ func NewRelayer(
return nil, nil, err
}

// Get the latest processed block height from the database. If the database doesn't have a value for the latest block height,
// for this chain, return here without an error. This will cause the subscriber to begin processing new incoming warp messages.
latestSeenBlockData, err := r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey))
if errors.Is(err, database.ErrChainNotFound) {
// Get the latest processed block height from the database.
var (
latestSeenBlockData []byte
latestSeenBlock *big.Int // initialized to nil
)
latestSeenBlockData, err = r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey))

// The following cases are treated as successful:
// 1) The database contains the latest seen block data for the chain
// - In this case, we parse the block height and process warp logs from that height to the current block
// 2) The database has been configured for the chain, but does not contain the latest seen block data
// - In this case, we save the current block height in the database, but do not process any historical warp logs
if err == nil {
// If the database contains the latest seen block data, then back-process all warp messages from the
// latest seen block to the latest block
// This will query the node for any logs that match the filter query from the stored block height,
r.logger.Info("latest processed block", zap.String("block", string(latestSeenBlockData)))
var success bool
latestSeenBlock, success = new(big.Int).SetString(string(latestSeenBlockData), 10)
if !success {
r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
return nil, nil, err
}
} else if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) {
// Otherwise, latestSeenBlock is nil, so the call to ProcessFromHeight will simply update the database with the
// latest block height
logger.Info(
"Latest seen block not found in database. Starting from latest block.",
zap.String("chainID", r.sourceChainID.String()),
)
return &r, sub, nil
}
if err != nil {
} else {
r.logger.Warn("failed to get latest block from database", zap.Error(err))
return nil, nil, err
}
latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10)
if !success {
r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
return nil, nil, err
}

// Back-process all warp messages from the latest seen block to the latest block
// This will query the node for any logs that match the filter query from the stored block height,
// and process the contained warp messages. If initialization fails, continue with normal relayer operation, but log the error.
// Process historical logs. If this fails for any reason, continue with normal relayer operation, but log the error.
err = sub.ProcessFromHeight(latestSeenBlock)
if err != nil {
logger.Warn(
Expand Down
63 changes: 35 additions & 28 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (s *subscriber) forwardLogs() {
}
}

// Process logs from the given block height to the latest block
// If height is nil, then simply store the latest block height in the database,
// but do not process any logs
func (s *subscriber) ProcessFromHeight(height *big.Int) error {
ethClient, err := ethclient.Dial(s.nodeRPCURL)
if err != nil {
Expand All @@ -155,39 +158,43 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
return err
}

// Filter logs from the latest processed block to the latest block
// Since initializationFilterQuery does not modify existing fields of warpFilterQuery,
// we can safely reuse warpFilterQuery with only a shallow copy
initializationFilterQuery := interfaces.FilterQuery{
Topics: warpFilterQuery.Topics,
Addresses: warpFilterQuery.Addresses,
FromBlock: height,
}
logs, err := ethClient.FilterLogs(context.Background(), initializationFilterQuery)
if err != nil {
s.logger.Error(
"Failed to get logs on initialization",
zap.Error(err),
)
return err
}

// Queue each of the logs to be processed
s.logger.Info(
"Processing logs on initialization",
zap.String("fromBlockHeight", height.String()),
zap.String("toBlockHeight", strconv.Itoa(int(latestBlock))),
)
for _, log := range logs {
messageInfo, err := s.NewWarpLogInfo(log)
// Only process logs if the provided height is not nil. Otherwise, simply update the database with
// the latest block height
if height != nil {
// Filter logs from the latest processed block to the latest block
// Since initializationFilterQuery does not modify existing fields of warpFilterQuery,
// we can safely reuse warpFilterQuery with only a shallow copy
initializationFilterQuery := interfaces.FilterQuery{
Topics: warpFilterQuery.Topics,
Addresses: warpFilterQuery.Addresses,
FromBlock: height,
}
logs, err := ethClient.FilterLogs(context.Background(), initializationFilterQuery)
if err != nil {
s.logger.Error(
"Invalid log when processing from height. Continuing.",
"Failed to get logs on initialization",
zap.Error(err),
)
continue
return err
}

// Queue each of the logs to be processed
s.logger.Info(
"Processing logs on initialization",
zap.String("fromBlockHeight", height.String()),
zap.String("toBlockHeight", strconv.Itoa(int(latestBlock))),
)
for _, log := range logs {
messageInfo, err := s.NewWarpLogInfo(log)
if err != nil {
s.logger.Error(
"Invalid log when processing from height. Continuing.",
zap.Error(err),
)
continue
}
s.log <- *messageInfo
}
s.log <- *messageInfo
}

// Update the database with the latest seen block height
Expand Down

0 comments on commit f4d2c83

Please sign in to comment.