Skip to content

Commit

Permalink
Refactor listener; add flush (#58)
Browse files Browse the repository at this point in the history
* track height, handle clients, eth flush

* noble flush

* error and webocket disconnect

* organize eth listener

* latest block mutex

* fix start logic

* error sync

* update error sig

* proccessing queue and client

* processingQueue fix, flush flag, spelling

* height tracker use websocket + cleanup

* fix noble flush

* dont use package variables

* feedback
  • Loading branch information
boojamya authored Mar 21, 2024
1 parent fd47ac5 commit 0c4a797
Show file tree
Hide file tree
Showing 16 changed files with 576 additions and 172 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ Running the relayer
noble-cctp-relayer start --config ./config/sample-app-config.yaml
```
Sample configs can be found in [config](config).
### Promethius Metrics

### Flush Interval

Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m`

The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on.

After that, it will flush from the last stored height - lookback period up until the latest height of the chain.

### Prometheus Metrics

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.

Expand Down
4 changes: 2 additions & 2 deletions cmd/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (a *AppState) loadConfigFile() {
}
config, err := ParseConfig(a.ConfigPath)
if err != nil {
a.Logger.Error("unable to parse config file", "location", a.ConfigPath, "err", err)
a.Logger.Error("Unable to parse config file", "location", a.ConfigPath, "err", err)
os.Exit(1)
}
a.Logger.Info("successfully parsed config file", "location", a.ConfigPath)
a.Logger.Info("Successfully parsed config file", "location", a.ConfigPath)
a.Config = config

}
12 changes: 7 additions & 5 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
)

const (
flagConfigPath = "config"
flagVerbose = "verbose"
flagLogLevel = "log-level"
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagConfigPath = "config"
flagVerbose = "verbose"
flagLogLevel = "log-level"
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagFlushInterval = "flush-interval"
)

func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
cmd.PersistentFlags().StringVar(&a.ConfigPath, flagConfigPath, defaultConfigPath, "file path of config file")
cmd.PersistentFlags().BoolVarP(&a.Debug, flagVerbose, "v", false, fmt.Sprintf("use this flag to set log level to `debug` (overrides %s flag)", flagLogLevel))
cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)")
cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port")
cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run")
return cmd

}
Expand Down
40 changes: 39 additions & 1 deletion cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}

metrics := relayer.InitPromMetrics(port)

for name, cfg := range cfg.Chains {
Expand All @@ -63,12 +71,35 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

logger = logger.With("name", c.Name(), "domain", c.Domain())

if err := c.InitializeClients(cmd.Context(), logger); err != nil {
logger.Error("error initializing client", "err", err)
os.Exit(1)
}

go c.TrackLatestBlockHeight(cmd.Context(), logger)

// wait until height is available
maxRetries := 45
for i := 0; i < maxRetries; i++ {
if c.LatestBlock() == 0 {
time.Sleep(1 * time.Second)
} else {
break
}
if i == maxRetries-1 {
logger.Error("Unable to get height")
os.Exit(1)
}
}

if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil {
logger.Error("Error initializing broadcaster", "error", err)
os.Exit(1)
}

go c.StartListener(cmd.Context(), logger, processingQueue)
go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval)
go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics)

if _, ok := registeredDomains[c.Domain()]; ok {
Expand All @@ -84,6 +115,13 @@ func Start(a *AppState) *cobra.Command {
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap)
}

defer func() {
for _, c := range registeredDomains {
fmt.Printf("\n%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())
c.CloseClients()
}
}()

<-cmd.Context().Done()
},
}
Expand Down
12 changes: 3 additions & 9 deletions ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)
Expand All @@ -40,14 +39,9 @@ func (e *Ethereum) Broadcast(
sequenceMap *types.SequenceMap,
) error {

// set up eth client
client, err := ethclient.Dial(e.rpcURL)
if err != nil {
return fmt.Errorf("unable to dial ethereum client: %w", err)
}
defer client.Close()
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)

backend := NewContractBackendWrapper(client)
backend := NewContractBackendWrapper(e.rpcClient)

auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID))
if err != nil {
Expand Down Expand Up @@ -149,7 +143,7 @@ func (e *Ethereum) attemptBroadcast(
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
msg.SourceDomain, msg.Nonce))
msg.SourceDomain, msg.Nonce), "src-tx", msg.SourceTxHash, "reviever")
msg.Status = types.Complete
return nil
}
Expand Down
53 changes: 53 additions & 0 deletions ethereum/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package ethereum

import (
"bytes"
"context"
"crypto/ecdsa"
"embed"
"encoding/hex"
"fmt"
"strings"
"sync"

"cosmossdk.io/log"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

Expand All @@ -17,6 +22,7 @@ var content embed.FS
var _ types.Chain = (*Ethereum)(nil)

type Ethereum struct {
// from conifg
name string
chainID int64
domain types.Domain
Expand All @@ -34,6 +40,12 @@ type Ethereum struct {
MetricsExponent int

mu sync.Mutex

wsClient *ethclient.Client
rpcClient *ethclient.Client

latestBlock uint64
lastFlushedBlock uint64
}

func NewChain(
Expand Down Expand Up @@ -83,6 +95,23 @@ func (e *Ethereum) Domain() types.Domain {
return e.domain
}

func (e *Ethereum) LatestBlock() uint64 {
e.mu.Lock()
block := e.latestBlock
e.mu.Unlock()
return block
}

func (e *Ethereum) SetLatestBlock(block uint64) {
e.mu.Lock()
e.latestBlock = block
e.mu.Unlock()
}

func (e *Ethereum) LastFlushedBlock() uint64 {
return e.lastFlushedBlock
}

func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool {
zeroByteArr := make([]byte, 32)

Expand All @@ -96,3 +125,27 @@ func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool {

return bytes.Equal(destinationCaller, zeroByteArr) || bytes.Equal(destinationCaller, decodedMinterPadded)
}

func (e *Ethereum) InitializeClients(ctx context.Context, logger log.Logger) error {
var err error

e.wsClient, err = ethclient.DialContext(ctx, e.wsURL)
if err != nil {
return fmt.Errorf("unable to initialize websocket ethereum client; err: %w", err)
}

e.rpcClient, err = ethclient.DialContext(ctx, e.rpcURL)
if err != nil {
return fmt.Errorf("unable to initialize rpc ethereum client; err: %w", err)
}
return nil
}

func (e *Ethereum) CloseClients() {
if e.wsClient != nil {
e.wsClient.Close()
}
if e.rpcClient != nil {
e.rpcClient.Close()
}
}
Loading

0 comments on commit 0c4a797

Please sign in to comment.