diff --git a/README.md b/README.md index 8fddda0..9bdb3e2 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,16 @@ Running the relayer noble-cctp-relayer start --config ./config/sample-app-config.yaml ``` Sample configs can be found in [config](config). -### Promethius Metrics + +### Flush Interval + +Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m` + +The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on. + +After that, it will flush from the last stored height - lookback period up until the latest height of the chain. + +### Prometheus Metrics By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag. diff --git a/cmd/appstate.go b/cmd/appstate.go index 732c6d3..dc1f08f 100644 --- a/cmd/appstate.go +++ b/cmd/appstate.go @@ -63,10 +63,10 @@ func (a *AppState) loadConfigFile() { } config, err := ParseConfig(a.ConfigPath) if err != nil { - a.Logger.Error("unable to parse config file", "location", a.ConfigPath, "err", err) + a.Logger.Error("Unable to parse config file", "location", a.ConfigPath, "err", err) os.Exit(1) } - a.Logger.Info("successfully parsed config file", "location", a.ConfigPath) + a.Logger.Info("Successfully parsed config file", "location", a.ConfigPath) a.Config = config } diff --git a/cmd/flags.go b/cmd/flags.go index 884d81e..5bc3cbc 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -7,11 +7,12 @@ import ( ) const ( - flagConfigPath = "config" - flagVerbose = "verbose" - flagLogLevel = "log-level" - flagJSON = "json" - flagMetricsPort = "metrics-port" + flagConfigPath = "config" + flagVerbose = "verbose" + flagLogLevel = "log-level" + flagJSON = "json" + flagMetricsPort = "metrics-port" + flagFlushInterval = "flush-interval" ) func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command { @@ -19,6 +20,7 @@ func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command { cmd.PersistentFlags().BoolVarP(&a.Debug, flagVerbose, "v", false, fmt.Sprintf("use this flag to set log level to `debug` (overrides %s flag)", flagLogLevel)) cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)") cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port") + cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run") return cmd } diff --git a/cmd/process.go b/cmd/process.go index b9c8125..c872022 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -54,6 +54,14 @@ func Start(a *AppState) *cobra.Command { os.Exit(1) } + flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) + if err != nil { + logger.Error("invalid flush interval", "error", err) + } + if flushInterval == 0 { + logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush") + } + metrics := relayer.InitPromMetrics(port) for name, cfg := range cfg.Chains { @@ -63,12 +71,35 @@ func Start(a *AppState) *cobra.Command { os.Exit(1) } + logger = logger.With("name", c.Name(), "domain", c.Domain()) + + if err := c.InitializeClients(cmd.Context(), logger); err != nil { + logger.Error("error initializing client", "err", err) + os.Exit(1) + } + + go c.TrackLatestBlockHeight(cmd.Context(), logger) + + // wait until height is available + maxRetries := 45 + for i := 0; i < maxRetries; i++ { + if c.LatestBlock() == 0 { + time.Sleep(1 * time.Second) + } else { + break + } + if i == maxRetries-1 { + logger.Error("Unable to get height") + os.Exit(1) + } + } + if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil { logger.Error("Error initializing broadcaster", "error", err) os.Exit(1) } - go c.StartListener(cmd.Context(), logger, processingQueue) + go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval) go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics) if _, ok := registeredDomains[c.Domain()]; ok { @@ -84,6 +115,13 @@ func Start(a *AppState) *cobra.Command { go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap) } + defer func() { + for _, c := range registeredDomains { + fmt.Printf("\n%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock()) + c.CloseClients() + } + }() + <-cmd.Context().Done() }, } diff --git a/ethereum/broadcast.go b/ethereum/broadcast.go index b752a14..c69e83d 100644 --- a/ethereum/broadcast.go +++ b/ethereum/broadcast.go @@ -14,7 +14,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" "github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) @@ -40,14 +39,9 @@ func (e *Ethereum) Broadcast( sequenceMap *types.SequenceMap, ) error { - // set up eth client - client, err := ethclient.Dial(e.rpcURL) - if err != nil { - return fmt.Errorf("unable to dial ethereum client: %w", err) - } - defer client.Close() + logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) - backend := NewContractBackendWrapper(client) + backend := NewContractBackendWrapper(e.rpcClient) auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID)) if err != nil { @@ -149,7 +143,7 @@ func (e *Ethereum) attemptBroadcast( if response.Uint64() == uint64(1) { // nonce has already been used, mark as complete logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", - msg.SourceDomain, msg.Nonce)) + msg.SourceDomain, msg.Nonce), "src-tx", msg.SourceTxHash, "reviever") msg.Status = types.Complete return nil } diff --git a/ethereum/chain.go b/ethereum/chain.go index 4bd8059..1b090b1 100644 --- a/ethereum/chain.go +++ b/ethereum/chain.go @@ -2,12 +2,17 @@ package ethereum import ( "bytes" + "context" "crypto/ecdsa" "embed" "encoding/hex" + "fmt" "strings" "sync" + "cosmossdk.io/log" + + "github.com/ethereum/go-ethereum/ethclient" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) @@ -17,6 +22,7 @@ var content embed.FS var _ types.Chain = (*Ethereum)(nil) type Ethereum struct { + // from conifg name string chainID int64 domain types.Domain @@ -34,6 +40,12 @@ type Ethereum struct { MetricsExponent int mu sync.Mutex + + wsClient *ethclient.Client + rpcClient *ethclient.Client + + latestBlock uint64 + lastFlushedBlock uint64 } func NewChain( @@ -83,6 +95,23 @@ func (e *Ethereum) Domain() types.Domain { return e.domain } +func (e *Ethereum) LatestBlock() uint64 { + e.mu.Lock() + block := e.latestBlock + e.mu.Unlock() + return block +} + +func (e *Ethereum) SetLatestBlock(block uint64) { + e.mu.Lock() + e.latestBlock = block + e.mu.Unlock() +} + +func (e *Ethereum) LastFlushedBlock() uint64 { + return e.lastFlushedBlock +} + func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool { zeroByteArr := make([]byte, 32) @@ -96,3 +125,27 @@ func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool { return bytes.Equal(destinationCaller, zeroByteArr) || bytes.Equal(destinationCaller, decodedMinterPadded) } + +func (e *Ethereum) InitializeClients(ctx context.Context, logger log.Logger) error { + var err error + + e.wsClient, err = ethclient.DialContext(ctx, e.wsURL) + if err != nil { + return fmt.Errorf("unable to initialize websocket ethereum client; err: %w", err) + } + + e.rpcClient, err = ethclient.DialContext(ctx, e.rpcURL) + if err != nil { + return fmt.Errorf("unable to initialize rpc ethereum client; err: %w", err) + } + return nil +} + +func (e *Ethereum) CloseClients() { + if e.wsClient != nil { + e.wsClient.Close() + } + if e.rpcClient != nil { + e.rpcClient.Close() + } +} diff --git a/ethereum/listener.go b/ethereum/listener.go index 9632dd2..06f0c72 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -10,165 +10,350 @@ import ( "cosmossdk.io/log" ethereum "github.com/ethereum/go-ethereum" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" "github.com/pascaldekloe/etherstream" "github.com/strangelove-ventures/noble-cctp-relayer/relayer" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) +// errSignal allows broadcasting an error value to multiple receivers. +type errSignal struct { + Ready chan struct{} +} + +// StartListener starts the ethereum websocket subscription, queries history pertaining to the lookback period, +// and starts the reoccurring flush +// +// If an error occurs in websocket stream, this function will handle relevant sub routines and then re-run itself. func (e *Ethereum) StartListener( ctx context.Context, logger log.Logger, processingQueue chan *types.TxState, + flushInterval time.Duration, ) { logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) - // set up client messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json") if err != nil { - logger.Error("unable to read MessageTransmitter abi", "err", err) + logger.Error("Unable to read MessageTransmitter abi", "err", err) os.Exit(1) } messageTransmitterABI, err := abi.JSON(bytes.NewReader(messageTransmitter)) if err != nil { - logger.Error("unable to parse MessageTransmitter abi", "err", err) + logger.Error("Unable to parse MessageTransmitter abi", "err", err) + os.Exit(1) } messageSent := messageTransmitterABI.Events["MessageSent"] + messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress) - ethClient, err := ethclient.DialContext(ctx, e.wsURL) - if err != nil { - logger.Error("unable to initialize ethereum client", "err", err) - os.Exit(1) + sig := &errSignal{ + Ready: make(chan struct{}), } - messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress) - etherReader := etherstream.Reader{Backend: ethClient} + // start main stream (does not account for lookback period or specific start block) + stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress) - if e.startBlock == 0 { - header, err := ethClient.HeaderByNumber(ctx, nil) - if err != nil { - logger.Error("unable to retrieve latest eth block header", "err", err) - os.Exit(1) - } + go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig) + consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI) + + // get history from (start block - lookback) up until latest block + latestBlock := e.LatestBlock() + start := latestBlock + if e.startBlock != 0 { + start = e.startBlock + } + startLookback := start - e.lookbackPeriod + logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod)) + e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock) - e.startBlock = header.Number.Uint64() + logger.Info("Finished getting history") + + if flushInterval > 0 { + go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushInterval, sig) } + // listen for errors in the main websocket stream + // if error occurs, trigger sig.Ready + // This will cancel `consumeStream` and `flushMechanism` routines + select { + case <-ctx.Done(): + return + case err := <-sub.Err(): + logger.Error("Websocket disconnected. Reconnecting...", "err", err) + close(sig.Ready) + + // restart + e.startBlock = e.lastFlushedBlock + time.Sleep(10 * time.Millisecond) + e.StartListener(ctx, logger, processingQueue, flushInterval) + return + } +} + +func (e *Ethereum) startMainStream( + ctx context.Context, + logger log.Logger, + messageSent abi.Event, + messageTransmitterAddress common.Address, + +) (stream <-chan ethtypes.Log, sub ethereum.Subscription, history []ethtypes.Log) { + + var err error + + etherReader := etherstream.Reader{Backend: e.wsClient} + + latestBlock := e.LatestBlock() + + // start initial stream (start-block and lookback period handled separately) + logger.Info("Starting Ethereum listener") + query := ethereum.FilterQuery{ Addresses: []common.Address{messageTransmitterAddress}, Topics: [][]common.Hash{{messageSent.ID}}, - FromBlock: big.NewInt(int64(e.startBlock - e.lookbackPeriod)), + FromBlock: big.NewInt(int64(latestBlock)), + } + + queryAttempt := 1 + for { + // websockets do not query history + // https://github.com/ethereum/go-ethereum/issues/15063 + stream, sub, history, err = etherReader.QueryWithHistory(ctx, &query) + if err != nil { + logger.Error("Unable to subscribe to logs", "attempt", queryAttempt, "err", err) + queryAttempt++ + time.Sleep(1 * time.Second) + continue + } + break } - logger.Info(fmt.Sprintf( - "Starting Ethereum listener at block %d looking back %d blocks", - e.startBlock, - e.lookbackPeriod)) + return stream, sub, history +} - // websockets do not query history - // https://github.com/ethereum/go-ethereum/issues/15063 - stream, sub, history, err := etherReader.QueryWithHistory(ctx, &query) - if err != nil { - logger.Error("unable to subscribe to logs", "err", err) - os.Exit(1) +func (e *Ethereum) getAndConsumeHistory( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, + messageSent abi.Event, + messageTransmitterAddress common.Address, + messageTransmitterABI abi.ABI, + start, end uint64) { + + var toUnSub ethereum.Subscription + var history []ethtypes.Log + var err error + + if start > end { + logger.Error(fmt.Sprintf("Unable to get history from %d to %d where the start block is greater than the end block", start, end)) + return } - // process history + // handle historical queries in chunks (some websockets only allow small history queries) + const chunkSize = uint64(100) + chunk := 1 + totalChunksNeeded := (end - start + chunkSize - 1) / chunkSize + + for start < end { + fromBlock := start + toBlock := start + chunkSize + if toBlock > end { + toBlock = end + } + + logger.Debug(fmt.Sprintf("Looking back in chunks of %d: chunk: %d/%d start-block: %d end-block: %d", chunkSize, chunk, totalChunksNeeded, fromBlock, toBlock)) + + etherReader := etherstream.Reader{Backend: e.wsClient} + + query := ethereum.FilterQuery{ + Addresses: []common.Address{messageTransmitterAddress}, + Topics: [][]common.Hash{{messageSent.ID}}, + FromBlock: big.NewInt(int64(fromBlock)), + ToBlock: big.NewInt(int64(toBlock)), + } + queryAttempt := 1 + for { + _, toUnSub, history, err = etherReader.QueryWithHistory(ctx, &query) + if err != nil { + // TODO: add metrics for this log + logger.Error(fmt.Sprintf("Unable to query history from %d to %d. attempt: %d", start, end, queryAttempt), "err", err) + queryAttempt++ + time.Sleep(1 * time.Second) + continue + } + break + } + toUnSub.Unsubscribe() + consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI) + + start += chunkSize + chunk++ + } +} + +// consumeHistory consumes the history from a QueryWithHistory() go-ethereum call. +// it passes messages to the processingQueue +func consumeHistory( + logger log.Logger, + history []ethtypes.Log, + processingQueue chan *types.TxState, + messageSent abi.Event, + messageTransmitterABI abi.ABI, +) { for _, historicalLog := range history { parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog) if err != nil { - logger.Error("Unable to parse history log into MessageState, skipping", "err", err) + logger.Error("Unable to parse history log into MessageState, skipping", "tx hash", historicalLog.TxHash.Hex(), "err", err) continue } logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) processingQueue <- &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } +} - // It might help to wait a small amount of time between sending messages into the processing queue - // so that account sequences / nonces are set correctly - // time.Sleep(10 * time.Millisecond) +// consumeStream consumes incoming transactions from a QueryWithHistory() go-ethereum call. +// if the websocket is disconnect, it restarts the stream using the last seen block height as the start height. +func (e *Ethereum) consumeStream( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, + messageSent abi.Event, + messageTransmitterABI abi.ABI, + stream <-chan ethtypes.Log, + sig *errSignal, + +) { + logger.Info("Starting consumption of incoming stream") + var txState *types.TxState + for { + select { + case <-ctx.Done(): + return + case <-sig.Ready: + logger.Debug("Websocket disconnected... Stopped consuming stream. Will restart after websocket is re-established") + return + case streamLog := <-stream: + parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) + if err != nil { + logger.Error("Unable to parse ws log into MessageState, skipping", "source tx", streamLog.TxHash.Hex(), "err", err) + continue + } + logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + if txState == nil { + txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } else if parsedMsg.SourceTxHash != txState.TxHash { + processingQueue <- txState + txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } else { + txState.Msgs = append(txState.Msgs, parsedMsg) + + } + default: + if txState != nil { + processingQueue <- txState + txState = nil + } + } } +} - // consume stream - go func() { - var txState *types.TxState - for { - select { - case <-ctx.Done(): - ethClient.Close() - return - case err := <-sub.Err(): - logger.Error("connection closed", "err", err) - ethClient.Close() - os.Exit(1) - case streamLog := <-stream: - parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) - if err != nil { - logger.Error("Unable to parse ws log into MessageState, skipping") - continue - } - logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - if txState == nil { - txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - } else if parsedMsg.SourceTxHash != txState.TxHash { - processingQueue <- txState - txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - } else { - txState.Msgs = append(txState.Msgs, parsedMsg) - - } - default: - if txState != nil { - processingQueue <- txState - txState = nil - } +func (e *Ethereum) flushMechanism( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, + messageSent abi.Event, + messageTransmitterAddress common.Address, + messageTransmitterABI abi.ABI, + flushInterval time.Duration, + sig *errSignal, +) { + logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval)) + + for { + timer := time.NewTimer(flushInterval) + select { + case <-timer.C: + latestBlock := e.LatestBlock() + + if e.lastFlushedBlock == 0 { + e.lastFlushedBlock = latestBlock } + + start := e.lastFlushedBlock - e.lookbackPeriod + + logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock)) + + e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, start, latestBlock) + + e.lastFlushedBlock = latestBlock + + logger.Info("Flush complete") + + // if main websocket stream is disconnected, stop flush. It will be restarted once websocket is reconnected + case <-sig.Ready: + timer.Stop() + logger.Debug("Websocket disconnected... Flush stopped. Will restart after websocket is re-established") + return + case <-ctx.Done(): + timer.Stop() + return + } + } +} + +func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { + logger.With("routine", "TrackLatestBlockHeight", "chain", e.name, "domain", e.domain) + + headers := make(chan *ethtypes.Header) + + sub, err := e.wsClient.SubscribeNewHead(ctx, headers) + if err != nil { + logger.Error("Failed to connect to websocket to track height. Will retry...", "err", err) + time.Sleep(1 * time.Second) + e.TrackLatestBlockHeight(ctx, logger) + return + } + + logger.Info("Height tracking websocket subscritpiton connected") + + for { + select { + case <-ctx.Done(): + return + case err := <-sub.Err(): + logger.Error("Height tracker websocket subscritpiton error. Attempting to reconnect...", "err", err) + e.TrackLatestBlockHeight(ctx, logger) + return + case header := <-headers: + e.SetLatestBlock(header.Number.Uint64()) } - }() + } } func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) { - logger = logger.With("metric", "wallet blance", "chain", e.name, "domain", e.domain) + logger = logger.With("metric", "wallet blannce", "chain", e.name, "domain", e.domain) queryRate := 30 * time.Second - var err error - var client *ethclient.Client - account := common.HexToAddress(e.minterAddress) exponent := big.NewInt(int64(e.MetricsExponent)) // ex: 18 scaleFactor := new(big.Float).SetInt(new(big.Int).Exp(big.NewInt(10), exponent, nil)) // ex: 10^18 - defer func() { - if client != nil { - client.Close() - } - }() - first := make(chan struct{}, 1) first <- struct{}{} - createClient := true for { timer := time.NewTimer(queryRate) select { // don't wait the "queryRate" amount of time if this is the first time running case <-first: timer.Stop() - if createClient { - client, err = ethclient.DialContext(ctx, e.rpcURL) - if err != nil { - logger.Error(fmt.Sprintf("error dialing eth client. Will try again in %d sec", queryRate), "error", err) - createClient = true - continue - } - } - balance, err := client.BalanceAt(ctx, account, nil) + balance, err := e.rpcClient.BalanceAt(ctx, account, nil) if err != nil { - logger.Error(fmt.Sprintf("error querying balance. Will try again in %d sec", queryRate), "error", err) - createClient = true + logger.Error(fmt.Sprintf("Error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err) continue } @@ -176,21 +361,10 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64() m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled) - - createClient = false case <-timer.C: - if createClient { - client, err = ethclient.DialContext(ctx, e.rpcURL) - if err != nil { - logger.Error(fmt.Sprintf("error dialing eth client. Will try again in %d sec", queryRate), "error", err) - createClient = true - continue - } - } - balance, err := client.BalanceAt(ctx, account, nil) + balance, err := e.rpcClient.BalanceAt(ctx, account, nil) if err != nil { - logger.Error(fmt.Sprintf("error querying balance. Will try again in %d sec", queryRate), "error", err) - createClient = true + logger.Error(fmt.Sprintf("Error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err) continue } @@ -199,7 +373,6 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled) - createClient = false case <-ctx.Done(): timer.Stop() return diff --git a/ethereum/listener_test.go b/ethereum/listener_test.go index 256f378..7d43c39 100644 --- a/ethereum/listener_test.go +++ b/ethereum/listener_test.go @@ -27,7 +27,7 @@ func TestStartListener(t *testing.T) { processingQueue := make(chan *types.TxState, 10000) - go eth.StartListener(ctx, a.Logger, processingQueue) + go eth.StartListener(ctx, a.Logger, processingQueue, 0) time.Sleep(5 * time.Second) diff --git a/integration/eth_burn_to_noble_mint_test.go b/integration/eth_burn_to_noble_mint_test.go index 025e59f..181a58f 100644 --- a/integration/eth_burn_to_noble_mint_test.go +++ b/integration/eth_burn_to_noble_mint_test.go @@ -54,6 +54,11 @@ func TestEthBurnToNobleMint(t *testing.T) { ethChain, err := ethCfg.Chain("eth") require.NoError(t, err) + err = nobleChain.InitializeClients(ctx, a.Logger) + require.NoError(t, err) + err = ethChain.InitializeClients(ctx, a.Logger) + require.NoError(t, err) + var burnAmount = big.NewInt(1) fmt.Println("Starting relayer...") @@ -67,7 +72,7 @@ func TestEthBurnToNobleMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) - go ethChain.StartListener(ctx, a.Logger, processingQueue) + go ethChain.StartListener(ctx, a.Logger, processingQueue, 0) go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap) _, _, generatedWallet := testdata.KeyTestPubAddr() diff --git a/integration/noble_burn_to_eth_mint_test.go b/integration/noble_burn_to_eth_mint_test.go index d52d05a..6f04edc 100644 --- a/integration/noble_burn_to_eth_mint_test.go +++ b/integration/noble_burn_to_eth_mint_test.go @@ -60,6 +60,11 @@ func TestNobleBurnToEthMint(t *testing.T) { ethChain, err := ethCfg.Chain("eth") require.NoError(t, err) + err = nobleChain.InitializeClients(ctx, a.Logger) + require.NoError(t, err) + err = ethChain.InitializeClients(ctx, a.Logger) + require.NoError(t, err) + fmt.Println("Starting relayer...") registeredDomains := make(map[types.Domain]types.Chain) @@ -72,7 +77,7 @@ func TestNobleBurnToEthMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) - go nobleChain.StartListener(ctx, a.Logger, processingQueue) + go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0) go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap) ethDestinationAddress, _, err := generateEthWallet() diff --git a/noble/broadcast.go b/noble/broadcast.go index d96eafb..fb760ab 100644 --- a/noble/broadcast.go +++ b/noble/broadcast.go @@ -60,14 +60,14 @@ func (n *Noble) Broadcast( txBuilder := sdkContext.TxConfig.NewTxBuilder() // sign and broadcast txn - for attempt := 0; attempt <= n.maxRetries; attempt++ { + for attempt := 1; attempt <= n.maxRetries; attempt++ { err := n.attemptBroadcast(ctx, logger, msgs, sequenceMap, sdkContext, txBuilder) if err == nil { return nil } // Log retry information - logger.Error("Broadcasting to noble failed. Retrying...", "error", err, "interval_seconds", n.retryIntervalSeconds) + logger.Error(fmt.Sprintf("Broadcasting to noble failed. Attempt %d/%d Retrying...", attempt, n.maxRetries), "error", err, "interval_seconds", n.retryIntervalSeconds, "src-tx", msgs[0].SourceTxHash) time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second) } @@ -99,7 +99,7 @@ func (n *Noble) attemptBroadcast( if used { msg.Status = types.Complete - logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used", msg.Nonce)) + logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used.", msg.Nonce), "src-tx", msg.SourceTxHash) continue } diff --git a/noble/chain.go b/noble/chain.go index c7e4515..e269f35 100644 --- a/noble/chain.go +++ b/noble/chain.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" + "cosmossdk.io/log" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/bech32" @@ -19,17 +20,15 @@ import ( var _ types.Chain = (*Noble)(nil) type Noble struct { - cc *cosmos.CosmosProvider - chainID string - - privateKey *secp256k1.PrivKey - minterAddress string - accountNumber uint64 - - startBlock uint64 - lookbackPeriod uint64 - workers uint32 - + // from config + chainID string + rpcURL string + privateKey *secp256k1.PrivKey + minterAddress string + accountNumber uint64 + startBlock uint64 + lookbackPeriod uint64 + workers uint32 gasLimit uint64 txMemo string maxRetries int @@ -38,6 +37,11 @@ type Noble struct { minAmount uint64 mu sync.Mutex + + cc *cosmos.CosmosProvider + + latestBlock uint64 + lastFlushedBlock uint64 } func NewChain( @@ -54,11 +58,6 @@ func NewChain( blockQueueChannelSize uint64, minAmount uint64, ) (*Noble, error) { - cc, err := cosmos.NewProvider(rpcURL) - if err != nil { - return nil, fmt.Errorf("unable to build cosmos provider for noble: %w", err) - } - keyBz, err := hex.DecodeString(privateKey) if err != nil { return nil, fmt.Errorf("unable to parse noble private key: %w", err) @@ -70,8 +69,8 @@ func NewChain( minterAddress := sdk.MustBech32ifyAddressBytes("noble", address) return &Noble{ - cc: cc, chainID: chainID, + rpcURL: rpcURL, startBlock: startBlock, lookbackPeriod: lookbackPeriod, workers: workers, @@ -109,6 +108,23 @@ func (n *Noble) Domain() types.Domain { return 4 } +func (n *Noble) LatestBlock() uint64 { + n.mu.Lock() + block := n.latestBlock + n.mu.Unlock() + return block +} + +func (n *Noble) SetLatestBlock(block uint64) { + n.mu.Lock() + n.latestBlock = block + n.mu.Unlock() +} + +func (n *Noble) LastFlushedBlock() uint64 { + return n.lastFlushedBlock +} + func (n *Noble) IsDestinationCaller(destinationCaller []byte) bool { zeroByteArr := make([]byte, 32) @@ -136,3 +152,18 @@ func decodeDestinationCaller(input []byte) (string, error) { } return output, nil } + +func (n *Noble) InitializeClients(ctx context.Context, logger log.Logger) error { + var err error + n.cc, err = cosmos.NewProvider(n.rpcURL) + if err != nil { + return fmt.Errorf("unable to build cosmos provider for noble: %w", err) + } + return nil +} + +func (n *Noble) CloseClients() { + if n.cc != nil && n.cc.RPCClient.IsRunning() { + n.cc.RPCClient.Stop() + } +} diff --git a/noble/listener.go b/noble/listener.go index 461f64f..61099b3 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -10,20 +10,20 @@ import ( "github.com/strangelove-ventures/noble-cctp-relayer/types" ) +var flushInterval time.Duration + func (n *Noble) StartListener( ctx context.Context, logger log.Logger, processingQueue chan *types.TxState, + flushInterval_ time.Duration, ) { logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain()) + flushInterval = flushInterval_ + if n.startBlock == 0 { - // get the latest block - chainTip, err := n.chainTip(ctx) - if err != nil { - panic(fmt.Errorf("unable to get chain tip for noble: %w", err)) - } - n.startBlock = chainTip + n.startBlock = n.LatestBlock() } logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks", @@ -40,7 +40,7 @@ func (n *Noble) StartListener( // enqueue block heights currentBlock := n.startBlock lookback := n.lookbackPeriod - chainTip, err := n.chainTip(ctx) + chainTip := n.LatestBlock() if n.blockQueueChannelSize == 0 { n.blockQueueChannelSize = defaultBlockQueueChannelSize @@ -63,24 +63,20 @@ func (n *Noble) StartListener( select { case <-first: timer.Stop() - chainTip, err = n.chainTip(ctx) - if err == nil { - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i - } - currentBlock = chainTip + 1 + chainTip = n.LatestBlock() + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i } + currentBlock = chainTip + 1 } case <-timer.C: - chainTip, err = n.chainTip(ctx) - if err == nil { - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i - } - currentBlock = chainTip + 1 + chainTip = n.LatestBlock() + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i } + currentBlock = chainTip + 1 } case <-ctx.Done(): timer.Stop() @@ -100,7 +96,7 @@ func (n *Noble) StartListener( block := <-blockQueue res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "") if err != nil || res == nil { - logger.Debug(fmt.Sprintf("unable to query Noble block %d", block), "error:", err) + logger.Debug(fmt.Sprintf("Unable to query Noble block %d. Will retry.", block), "error:", err) blockQueue <- block continue } @@ -108,7 +104,7 @@ func (n *Noble) StartListener( for _, tx := range res.Txs { parsedMsgs, err := txToMessageState(tx) if err != nil { - logger.Error("unable to parse Noble log to message state", "err", err.Error()) + logger.Error("Unable to parse Noble log to message state", "err", err.Error()) continue } for _, parsedMsg := range parsedMsgs { @@ -121,15 +117,87 @@ func (n *Noble) StartListener( }() } + if flushInterval > 0 { + go n.flushMechanism(ctx, logger, blockQueue) + } + <-ctx.Done() } -func (n *Noble) chainTip(ctx context.Context) (uint64, error) { +func (n *Noble) flushMechanism( + ctx context.Context, + logger log.Logger, + blockQueue chan uint64, +) { + + logger.Debug(fmt.Sprintf("Flush mechanism started. Will flush every %v", flushInterval)) + + for { + timer := time.NewTimer(flushInterval) + select { + case <-timer.C: + latestBlock := n.LatestBlock() + + // test to see that the rpc is available before attempting flush + res, err := n.cc.RPCClient.Status(ctx) + if err != nil { + logger.Error(fmt.Sprintf("Skipping flush... error reaching out to rpc, will retry flush in %v", flushInterval)) + continue + } + if res.SyncInfo.CatchingUp { + logger.Error(fmt.Sprintf("Skipping flush... rpc still catching, will retry flush in %v", flushInterval)) + continue + } + + if n.lastFlushedBlock == 0 { + n.lastFlushedBlock = latestBlock + } + lastFlushedBlock := n.lastFlushedBlock + + flushStart := lastFlushedBlock - n.lookbackPeriod + + logger.Info(fmt.Sprintf("Flush started from: %d to: %d", flushStart, latestBlock)) + + for i := flushStart; i <= latestBlock; i++ { + blockQueue <- i + } + n.lastFlushedBlock = latestBlock + + logger.Info("Flush complete") + + case <-ctx.Done(): + timer.Stop() + return + } + } +} + +func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { + logger.With("routine", "TrackLatestBlockHeight", "chain", n.Name(), "domain", n.Domain()) + + // first time res, err := n.cc.RPCClient.Status(ctx) if err != nil { - return 0, fmt.Errorf("unable to query status for noble: %w", err) + logger.Error("Unable to query Nobles latest height", "err", err) + } + n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight)) + + // then start loop on a timer + for { + timer := time.NewTimer(6 * time.Second) + select { + case <-timer.C: + res, err := n.cc.RPCClient.Status(ctx) + if err != nil { + logger.Error("Unable to query Nobles latest height", "err", err) + continue + } + n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight)) + case <-ctx.Done(): + timer.Stop() + return + } } - return uint64(res.SyncInfo.LatestBlockHeight), nil } func (n *Noble) WalletBalanceMetric(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) { diff --git a/noble/listener_test.go b/noble/listener_test.go index 5d7a615..0d886b5 100644 --- a/noble/listener_test.go +++ b/noble/listener_test.go @@ -26,7 +26,7 @@ func TestStartListener(t *testing.T) { processingQueue := make(chan *types.TxState, 10000) - go n.StartListener(ctx, a.Logger, processingQueue) + go n.StartListener(ctx, a.Logger, processingQueue, 0) time.Sleep(20 * time.Second) diff --git a/types/chain.go b/types/chain.go index f8f05c3..98a3873 100644 --- a/types/chain.go +++ b/types/chain.go @@ -2,6 +2,7 @@ package types import ( "context" + "time" "cosmossdk.io/log" "github.com/strangelove-ventures/noble-cctp-relayer/relayer" @@ -15,10 +16,29 @@ type Chain interface { // Domain returns the domain ID of the chain. Domain() Domain + // LatestBlockain returns the last queired height of the chain + LatestBlock() uint64 + + // SetLatestBlock sets the latest block + SetLatestBlock(block uint64) + + // LastFlushedBlock returns the last block included in a flush. In the rare situation of a crash, + // this block is a good block to start at to catch up on any missed transactions. + LastFlushedBlock() uint64 + // IsDestinationCaller returns true if the specified destination caller is the minter for the specified domain OR // if destination caller is a zero byte array(left empty in deposit for burn message) IsDestinationCaller(destinationCaller []byte) bool + // InitializeClients initializes the rpc and or websocket clients. + InitializeClients( + ctx context.Context, + logger log.Logger, + ) error + + // CloseClients is a cleanup function to close any open clients + CloseClients() + // InitializeBroadcaster initializes the minter account info for the chain. InitializeBroadcaster( ctx context.Context, @@ -31,6 +51,7 @@ type Chain interface { ctx context.Context, logger log.Logger, processingQueue chan *TxState, + flushInterval time.Duration, ) // Broadcast broadcasts CCTP mint messages to the chain. @@ -41,6 +62,11 @@ type Chain interface { sequenceMap *SequenceMap, ) error + TrackLatestBlockHeight( + ctx context.Context, + logger log.Logger, + ) + WalletBalanceMetric( ctx context.Context, logger log.Logger, diff --git a/types/message_state.go b/types/message_state.go index d112230..27c665e 100644 --- a/types/message_state.go +++ b/types/message_state.go @@ -79,7 +79,7 @@ func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log) return messageState, nil } - return nil, fmt.Errorf("unable to parse tx into message, tx hash %s", log.TxHash.Hex()) + return nil, fmt.Errorf("unable to parse tx into message, err: %w", err) } // Equal checks if two MessageState instances are equal