From 866b2f47a8c582a68f3196ca2f240867a66e0f28 Mon Sep 17 00:00:00 2001 From: Dan Kanefsky <56059752+boojamya@users.noreply.github.com> Date: Thu, 1 Feb 2024 11:54:58 -0800 Subject: [PATCH] Make block channel size configurable (#45) * make block channel size configurable * feedback --- cmd/root_test.go | 18 ++++++++++++++++-- config/sample-config.yaml | 2 ++ noble/chain.go | 33 ++++++++++++++++++--------------- noble/config.go | 5 +++++ noble/listener.go | 6 +++++- types/message_state.go | 4 +++- 6 files changed, 49 insertions(+), 19 deletions(-) diff --git a/cmd/root_test.go b/cmd/root_test.go index 3fa440f..17ab405 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -14,12 +14,26 @@ func TestConfig(t *testing.T) { require.NoError(t, err, "Error parsing config") // assert noble chainConfig correctly parsed - var nobleType interface{} = file.Chains["noble"] + var nobleType any = file.Chains["noble"] _, ok := nobleType.(*noble.ChainConfig) require.True(t, ok) // assert ethereum chainConfig correctly parsed - var ethType interface{} = file.Chains["ethereum"] + var ethType any = file.Chains["ethereum"] _, ok = ethType.(*ethereum.ChainConfig) require.True(t, ok) } + +func TestBlockQueueChannelSize(t *testing.T) { + file, err := cmd.Parse("../config/sample-config.yaml") + require.NoError(t, err, "Error parsing config") + + var nobleCfg any = file.Chains["noble"] + n, ok := nobleCfg.(*noble.ChainConfig) + require.True(t, ok) + + // block-queue-channel-size is set to 1000000 in sample-config + expected := uint64(1000000) + + require.Equal(t, expected, n.BlockQueueChannelSize) +} diff --git a/config/sample-config.yaml b/config/sample-config.yaml index 868716b..58d9883 100644 --- a/config/sample-config.yaml +++ b/config/sample-config.yaml @@ -28,6 +28,8 @@ chains: broadcast-retries: 5 # number of times to attempt the broadcast broadcast-retry-interval: 5 # time between retries in seconds + block-queue-channel-size: 1000000 # 1000000 is a safe default, increase number if starting from a very early block + minter-private-key: # hex encoded privateKey # source domain id -> destination domain id diff --git a/noble/chain.go b/noble/chain.go index fd99374..e1c20d9 100644 --- a/noble/chain.go +++ b/noble/chain.go @@ -30,10 +30,11 @@ type Noble struct { lookbackPeriod uint64 workers uint32 - gasLimit uint64 - txMemo string - maxRetries int - retryIntervalSeconds int + gasLimit uint64 + txMemo string + maxRetries int + retryIntervalSeconds int + blockQueueChannelSize uint64 mu sync.Mutex } @@ -49,6 +50,7 @@ func NewChain( txMemo string, maxRetries int, retryIntervalSeconds int, + blockQueueChannelSize uint64, ) (*Noble, error) { cc, err := cosmos.NewProvider(rpcURL) if err != nil { @@ -66,17 +68,18 @@ func NewChain( minterAddress := sdk.MustBech32ifyAddressBytes("noble", address) return &Noble{ - cc: cc, - chainID: chainID, - startBlock: startBlock, - lookbackPeriod: lookbackPeriod, - workers: workers, - privateKey: &privKey, - minterAddress: minterAddress, - gasLimit: gasLimit, - txMemo: txMemo, - maxRetries: maxRetries, - retryIntervalSeconds: retryIntervalSeconds, + cc: cc, + chainID: chainID, + startBlock: startBlock, + lookbackPeriod: lookbackPeriod, + workers: workers, + privateKey: &privKey, + minterAddress: minterAddress, + gasLimit: gasLimit, + txMemo: txMemo, + maxRetries: maxRetries, + retryIntervalSeconds: retryIntervalSeconds, + blockQueueChannelSize: blockQueueChannelSize, }, nil } diff --git a/noble/config.go b/noble/config.go index a9085a1..bcad409 100644 --- a/noble/config.go +++ b/noble/config.go @@ -4,6 +4,8 @@ import "github.com/strangelove-ventures/noble-cctp-relayer/types" var _ types.ChainConfig = (*ChainConfig)(nil) +const defaultBlockQueueChannelSize = 1000000 + type ChainConfig struct { RPC string `yaml:"rpc"` ChainID string `yaml:"chain-id"` @@ -17,6 +19,8 @@ type ChainConfig struct { BroadcastRetries int `yaml:"broadcast-retries"` BroadcastRetryInterval int `yaml:"broadcast-retry-interval"` + BlockQueueChannelSize uint64 `yaml:"block-queue-channel-size"` + // TODO move to keyring MinterPrivateKey string `yaml:"minter-private-key"` } @@ -33,5 +37,6 @@ func (c *ChainConfig) Chain(name string) (types.Chain, error) { c.TxMemo, c.BroadcastRetries, c.BroadcastRetryInterval, + c.BlockQueueChannelSize, ) } diff --git a/noble/listener.go b/noble/listener.go index 546b749..138b402 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -40,7 +40,11 @@ func (n *Noble) StartListener( currentBlock := n.startBlock lookback := n.lookbackPeriod chainTip, err := n.chainTip(ctx) - blockQueue := make(chan uint64, 1000000) + + if n.blockQueueChannelSize == 0 { + n.blockQueueChannelSize = defaultBlockQueueChannelSize + } + blockQueue := make(chan uint64, n.blockQueueChannelSize) // history currentBlock = currentBlock - lookback diff --git a/types/message_state.go b/types/message_state.go index 3bfcd0d..c1535d0 100644 --- a/types/message_state.go +++ b/types/message_state.go @@ -51,7 +51,9 @@ type MessageState struct { // EvmLogToMessageState transforms an evm log into a messageState given an ABI func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log) (messageState *MessageState, err error) { event := make(map[string]interface{}) - _ = abi.UnpackIntoMap(event, messageSent.Name, log.Data) + if err = abi.UnpackIntoMap(event, messageSent.Name, log.Data); err != nil { + return nil, fmt.Errorf("unable to unpack evm log. error: %w", err) + } rawMessageSentBytes := event["message"].([]byte) message, _ := new(types.Message).Parse(rawMessageSentBytes)