Skip to content

Commit

Permalink
add state to chainlink message decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
iFrostizz committed Sep 19, 2024
1 parent 297b49c commit 99fe8c4
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 33 deletions.
20 changes: 14 additions & 6 deletions messages/chainlink/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"fmt"
"strconv"

"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/common"
)

type RawConfig struct {
AggregatorsToReplicas map[string]string `json:"aggregators-to-replicas"`
MaxFilterAdresses string `json:"max-filter-addresses"`
AggregatorsToReplicas map[string]string `json:"aggregators-to-replicas"`
MaxFilterAdresses string `json:"max-filter-addresses"`
DestinationBlockchainID string `json:"destination-blockchain-id"`
}

type Config struct {
AggregatorsToReplicas map[common.Address]common.Address
MaxFilterAdresses uint64
AggregatorsToReplicas map[common.Address]common.Address
MaxFilterAdresses uint64
DestinationBlockchainID ids.ID
}

func (c *RawConfig) Parse() (*Config, error) {
Expand All @@ -36,9 +39,14 @@ func (c *RawConfig) Parse() (*Config, error) {
}
aggregatorToReplicas[aggregator] = replica
}
destinationBlockchainID, err := ids.FromString(c.DestinationBlockchainID)
if err != nil {
return nil, err
}
config := Config{
AggregatorsToReplicas: aggregatorToReplicas,
MaxFilterAdresses: maxFilterAdresses,
AggregatorsToReplicas: aggregatorToReplicas,
MaxFilterAdresses: maxFilterAdresses,
DestinationBlockchainID: destinationBlockchainID,
}
return &config, nil
}
82 changes: 60 additions & 22 deletions messages/chainlink/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ type factory struct {
}

type ChainlinkMessageHandler struct {
unsignedMessage *warp.UnsignedMessage
logger logging.Logger
maxFilterAdresses uint64
aggregatorsToReplicas map[common.Address]common.Address
aggregators []common.Address
unsignedMessage *warp.UnsignedMessage
logger logging.Logger
destinationBlockchainID ids.ID
maxFilterAdresses uint64
aggregatorsToReplicas map[common.Address]common.Address
aggregators []common.Address
}

type ChainlinkMessageDecoder struct {
handler *ChainlinkMessageHandler
aggregators []common.Address
}

type ChainlinkMessage struct {
Expand All @@ -57,6 +58,20 @@ type ChainlinkMessage struct {

var ChainlinkPriceUpdatedFilter = common.HexToHash("0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f")

func NewMessageDecoder(messageProtocolConfig config.MessageProtocolConfig) (*ChainlinkMessageDecoder, error) {
cfg, err := ParseConfig(messageProtocolConfig)
aggregators := make([]common.Address, len(cfg.AggregatorsToReplicas))

Check warning

Code scanning / CodeQL

Missing error check Warning

cfg
may be nil at this dereference because
err
may not have been checked.
for aggregator := range cfg.AggregatorsToReplicas {

Check warning

Code scanning / CodeQL

Missing error check Warning

cfg
may be nil at this dereference because
err
may not have been checked.
aggregators = append(aggregators, aggregator)
}
if err != nil {
return nil, err
}
return &ChainlinkMessageDecoder{
aggregators: aggregators,
}, nil
}

func (c ChainlinkMessageDecoder) Decode(
ctx context.Context,
header *subnetTypes.Header,
Expand All @@ -75,7 +90,7 @@ func (c ChainlinkMessageDecoder) Decode(
func() ([]subnetTypes.Log, error) {
return ethClient.FilterLogs(context.Background(), subnetInterfaces.FilterQuery{
Topics: [][]common.Hash{{ChainlinkPriceUpdatedFilter}},
Addresses: c.handler.aggregators,
Addresses: c.aggregators,
FromBlock: header.Number,
ToBlock: header.Number,
})
Expand Down Expand Up @@ -145,26 +160,33 @@ func ConvertToUnsignedMessage(msg *ChainlinkMessage) (*warp.UnsignedMessage, err
return warp.ParseUnsignedMessage(bytes)
}

func NewMessageHandlerFactory(
logger logging.Logger,
messageProtocolConfig config.MessageProtocolConfig,
) (messages.MessageHandlerFactory, error) {
func ParseConfig(messageProtocolConfig config.MessageProtocolConfig) (*Config, error) {
data, err := json.Marshal(messageProtocolConfig.Settings)
if err != nil {
logger.Error("Failed to marshal Teleporter config")
return nil, err
return nil, fmt.Errorf("Failed to marshal Teleporter config: %w", err)
}
var messageConfig RawConfig
if err := json.Unmarshal(data, &messageConfig); err != nil {
logger.Error("Failed to unmarshal Teleporter config")
return nil, err
return nil, fmt.Errorf("Failed to unmarshal Teleporter config: %w", err)
}

config, err := messageConfig.Parse()
if err != nil {
return nil, err
}

return config, nil
}

func NewMessageHandlerFactory(
logger logging.Logger,
messageProtocolConfig config.MessageProtocolConfig,
) (messages.MessageHandlerFactory, error) {
config, err := ParseConfig(messageProtocolConfig)
if err != nil {
return nil, err
}

return &factory{
logger: logger,
config: config,
Expand All @@ -179,11 +201,12 @@ func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (mess
}

return &ChainlinkMessageHandler{
logger: f.logger,
unsignedMessage: unsignedMessage,
maxFilterAdresses: f.config.MaxFilterAdresses,
aggregatorsToReplicas: aggregatorsToReplicas,
aggregators: aggregators,
logger: f.logger,
unsignedMessage: unsignedMessage,
destinationBlockchainID: f.config.DestinationBlockchainID,
maxFilterAdresses: f.config.MaxFilterAdresses,
aggregatorsToReplicas: aggregatorsToReplicas,
aggregators: aggregators,
}, nil
}

Expand Down Expand Up @@ -272,14 +295,29 @@ func (c *ChainlinkMessageHandler) SendMessage(
return txHash, nil
}

func (c *ChainlinkMessageHandler) GetMessageRoutingInfo() (
func (c *ChainlinkMessageHandler) GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) (
ids.ID,
common.Address,
ids.ID,
common.Address,
error,
) {
return ids.Empty, common.Address{}, ids.Empty, common.Address{}, nil
var msg ChainlinkMessage
err := json.Unmarshal(warpMessageInfo.UnsignedMessage.Payload, &msg)
if err != nil {
return ids.Empty, common.Address{}, ids.Empty, common.Address{}, err
}

replica, ok := c.aggregatorsToReplicas[msg.aggregator]
if !ok {
return ids.Empty, common.Address{}, ids.Empty, common.Address{}, fmt.Errorf("replica not found for aggregator %s", msg.aggregator)
}

return c.unsignedMessage.SourceChainID,
warpMessageInfo.SourceAddress,
c.destinationBlockchainID,
replica,
nil
}

func (c *ChainlinkMessageHandler) GetUnsignedMessage() *warp.UnsignedMessage {
Expand Down
2 changes: 1 addition & 1 deletion messages/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type MessageHandler interface {

// GetMessageRoutingInfo returns the source chain ID, origin sender address,
// destination chain ID, and destination address.
GetMessageRoutingInfo() (
GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) (
ids.ID,
common.Address,
ids.ID,
Expand Down
3 changes: 2 additions & 1 deletion messages/off-chain-registry/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/relayer/config"
relayerTypes "github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/accounts/abi/bind"
"github.com/ava-labs/subnet-evm/ethclient"
Expand Down Expand Up @@ -195,7 +196,7 @@ func (m *messageHandler) SendMessage(
return txHash, nil
}

func (m *messageHandler) GetMessageRoutingInfo() (
func (m *messageHandler) GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) (
ids.ID,
common.Address,
ids.ID,
Expand Down
3 changes: 2 additions & 1 deletion messages/teleporter/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ava-labs/awm-relayer/messages"
pbDecider "github.com/ava-labs/awm-relayer/proto/pb/decider"
"github.com/ava-labs/awm-relayer/relayer/config"
relayerTypes "github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/accounts/abi/bind"
"github.com/ava-labs/subnet-evm/ethclient"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (m *messageHandler) GetUnsignedMessage() *warp.UnsignedMessage {
return m.unsignedMessage
}

func (m *messageHandler) GetMessageRoutingInfo() (
func (m *messageHandler) GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) (
ids.ID,
common.Address,
ids.ID,
Expand Down
34 changes: 33 additions & 1 deletion relayer/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ func main() {
logger.Fatal("Failed to create application relayers", zap.Error(err))
panic(err)
}
messagesDecoders := []messages.MessageDecoder{messages.WarpMessageDecoder{}}
messagesDecoders, err := createMessageDecoders(logger, &cfg)
if err != nil {
logger.Fatal("Failed to create application relayers", zap.Error(err))
panic(err)
}
messageCoordinator := relayer.NewMessageCoordinator(
logger,
messageHandlerFactories,
Expand Down Expand Up @@ -342,6 +346,34 @@ func createMessageHandlerFactories(
return messageHandlerFactories, nil
}

func createMessageDecoders(logger logging.Logger, globalConfig *config.Config) ([]messages.MessageDecoder, error) {
messageDecoders := make([]messages.MessageDecoder, 0)
for _, sourceBlockchain := range globalConfig.SourceBlockchains {
// Create message decoder for each supported message protocol
for _, cfg := range sourceBlockchain.MessageContracts {
format := cfg.MessageFormat
var (
m messages.MessageDecoder
err error
)
switch config.ParseMessageProtocol(format) {
case config.TELEPORTER, config.OFF_CHAIN_REGISTRY:
m = messages.WarpMessageDecoder{}
case config.CHAINLINK_PRICE_FEED:
m, err = chainlink.NewMessageDecoder(cfg)
default:
m, err = nil, fmt.Errorf("invalid message format %s", format)
}
if err != nil {
logger.Error("Failed to create message handler factory", zap.Error(err))
return nil, err
}
messageDecoders = append(messageDecoders, m)
}
}
return messageDecoders, nil
}

func createSourceClients(
ctx context.Context,
logger logging.Logger,
Expand Down
2 changes: 1 addition & 1 deletion relayer/message_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (mc *MessageCoordinator) getAppRelayerMessageHandler(

// Fetch the message delivery data
//nolint:lll
sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo()
sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo(warpMessageInfo)
if err != nil {
mc.logger.Error("Failed to get message routing information", zap.Error(err))
return nil, nil, err
Expand Down

0 comments on commit 99fe8c4

Please sign in to comment.