diff --git a/config/config.go b/config/config.go index 5210ab78..01b3a547 100644 --- a/config/config.go +++ b/config/config.go @@ -237,6 +237,9 @@ func (s *SourceSubnet) Validate() error { return fmt.Errorf("invalid message contract address in EVM source subnet: %s", messageContractAddress) } } + case EVM_BLOCKHASH: + // No additional validation required + // TODONOW: we shouldn't require an address as the key for block hash publisher default: return fmt.Errorf("unsupported VM type for source subnet: %v", s.VM) } diff --git a/main/main.go b/main/main.go index c9ef56d5..333c3603 100644 --- a/main/main.go +++ b/main/main.go @@ -229,7 +229,7 @@ func runRelayer(logger logging.Logger, select { case txLog := <-subscriber.Logs(): logger.Info( - "Handling Teleporter submit message log.", + "Handling message log.", zap.String("txId", hex.EncodeToString(txLog.SourceTxID)), zap.String("originChainId", sourceSubnetInfo.ChainID), zap.String("destinationChainId", txLog.DestinationChainID.String()), diff --git a/messages/block_hash_publisher/config.go b/messages/block_hash_publisher/config.go index 807ce185..8a097f20 100644 --- a/messages/block_hash_publisher/config.go +++ b/messages/block_hash_publisher/config.go @@ -28,11 +28,14 @@ func (c *Config) Validate() error { for i, destinationInfo := range c.DestinationChains { // Check if the chainID is valid if _, err := ids.FromString(destinationInfo.ChainID); err != nil { - return errors.Wrap(err, fmt.Sprintf("invalid subnetID in block hash publisher configuration. Provided ID: %s", destinationInfo.ChainID)) + return errors.Wrap(err, fmt.Sprintf("invalid chainID in block hash publisher configuration. Provided ID: %s", destinationInfo.ChainID)) } // Check if the address is valid addr := destinationInfo.Address + if addr == "" { + return errors.New("empty address in block hash publisher configuration") + } if strings.HasPrefix(addr, "0x") { addr = addr[2:] } diff --git a/messages/block_hash_publisher/message_manager.go b/messages/block_hash_publisher/message_manager.go index f79030fc..a77199ff 100644 --- a/messages/block_hash_publisher/message_manager.go +++ b/messages/block_hash_publisher/message_manager.go @@ -17,7 +17,7 @@ import ( ) const ( - publishBlockHashGasLimit = 100000 // TODONOW: set the correct gas limit + publishBlockHashGasLimit = 1000000 // TODONOW: set the correct gas limit ) type destinationSenderInfo struct { @@ -45,7 +45,7 @@ func NewMessageManager( messageProtocolConfig config.MessageProtocolConfig, destinationClients map[ids.ID]vms.DestinationClient, ) (*messageManager, error) { - // Marshal the map and unmarshal into the Teleporter config + // Marshal the map and unmarshal into the Block Hash Publisher config data, err := json.Marshal(messageProtocolConfig.Settings) if err != nil { logger.Error("Failed to marshal Block Hash Publisher config") @@ -64,6 +64,7 @@ func NewMessageManager( ) return nil, err } + logger.Info("DEBUG CONFIG", zap.String("config", fmt.Sprintf("%#v", messageConfig))) destinations := make(map[ids.ID]*destinationSenderInfo) for _, destination := range messageConfig.DestinationChains { @@ -78,9 +79,12 @@ func NewMessageManager( destinations[destinationID] = &destinationSenderInfo{ useTimeInterval: destination.useTimeInterval, timeIntervalSeconds: uint64(destination.timeIntervalSeconds), + blockInterval: uint64(destination.blockInterval), address: common.HexToAddress(destination.Address), client: destinationClients[destinationID], } + logger.Info("DEBUG DESTINATIONS", zap.String("address", destinations[destinationID].address.String())) + } return &messageManager{ @@ -93,6 +97,15 @@ func NewMessageManager( func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) { destination, ok := m.destinations[destinationChainID] if !ok { + var destinationIDs []string + for id := range m.destinations { + destinationIDs = append(destinationIDs, id.String()) + } + m.logger.Info( + "DEBUG", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("configuredDestinations", fmt.Sprintf("%#v", destinationIDs)), + ) return false, fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID) } if destination.useTimeInterval { @@ -103,12 +116,24 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI } else { interval := destination.blockInterval if warpMessageInfo.BlockNumber-destination.lastBlock < uint64(interval) { + m.logger.Info( + "DEBUG", + zap.String("decision", "Not sending"), + zap.Int("blockNum", int(warpMessageInfo.BlockNumber)), + zap.Int("lastBlockNum", int(destination.lastBlock)), + ) return false, nil } } // Set the last approved block/time here. We don't set the last sent block/time until the message is actually sent destination.lastApprovedBlock = warpMessageInfo.BlockNumber destination.lastApprovedTime = warpMessageInfo.BlockTimestamp + m.logger.Info( + "DEBUG", + zap.String("decision", "Sending"), + zap.Int("blockNum", int(warpMessageInfo.BlockNumber)), + zap.Int("lastBlockNum", int(destination.lastBlock)), + ) return true, nil } diff --git a/tests/BlockHashReceiverByteCode.txt b/tests/BlockHashReceiverByteCode.txt new file mode 100644 index 00000000..efaf55db --- /dev/null +++ b/tests/BlockHashReceiverByteCode.txt @@ -0,0 +1 @@ +0x608060405234801561001057600080fd5b50600160008190558055610280806100296000396000f3fe608060405234801561001057600080fd5b50600436106100365760003560e01c806330b525591461003b578063b771b3bc14610050575b600080fd5b61004e61004936600461018f565b61007a565b005b61005e6005600160991b0181565b6040516001600160a01b03909116815260200160405180910390f35b600180541461009c5760405163a815ca6b60e01b815260040160405180910390fd5b600260015560405163ce7f592960e01b815263ffffffff8316600482015260009081906005600160991b019063ce7f592990602401606060405180830381865afa1580156100ee573d6000803e3d6000fd5b505050506040513d601f19601f8201168201806040525081019061011291906101c4565b9150915080610134576040516339ecf43d60e21b815260040160405180910390fd5b815183146101555760405163e23ef14d60e01b815260040160405180910390fd5b602082015182516040517f7770e5f72465e9b05c8076c3f2eac70898abe6a84f0259307d127c13e2a1a4e490600090a35050600180555050565b600080604083850312156101a257600080fd5b823563ffffffff811681146101b657600080fd5b946020939093013593505050565b60008082840360608112156101d857600080fd5b60408112156101e657600080fd5b506040516040810181811067ffffffffffffffff8211171561021857634e487b7160e01b600052604160045260246000fd5b60409081528451825260208086015190830152840151909250801515811461023f57600080fd5b80915050925092905056fea264697066735822122035befb02f079d9bed5ae8edf23154d8ee3b487f193f6962a513df22f76fd6b6464736f6c63430008120033 \ No newline at end of file diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 4ba2a9b4..86aa17c8 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -4,8 +4,10 @@ package tests import ( + "bufio" "context" "crypto/ecdsa" + "encoding/hex" "encoding/json" "fmt" "math/big" @@ -17,21 +19,19 @@ import ( "github.com/ava-labs/avalanche-network-runner/rpcpb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" - avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" - "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/messages/teleporter" "github.com/ava-labs/awm-relayer/peers" relayerEvm "github.com/ava-labs/awm-relayer/vms/evm" + "github.com/ava-labs/subnet-evm/accounts/abi" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/interfaces" "github.com/ava-labs/subnet-evm/plugin/evm" - "github.com/ava-labs/subnet-evm/rpc" "github.com/ava-labs/subnet-evm/tests/utils/runner" - predicateutils "github.com/ava-labs/subnet-evm/utils/predicate" - warpPayload "github.com/ava-labs/subnet-evm/warp/payload" - "github.com/ava-labs/subnet-evm/x/warp" + teleporter_block_hash "github.com/ava-labs/teleporter/abis/go/teleporter-block-hash" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/onsi/ginkgo/v2" @@ -63,7 +63,7 @@ var ( subnetIDs []ids.ID subnetA, subnetB ids.ID fundedKey *ecdsa.PrivateKey - chainBWSClient ethclient.Client + chainAWSClient, chainBWSClient ethclient.Client chainARPCClient, chainBRPCClient ethclient.Client chainARPCURI, chainBRPCURI string chainAIDInt, chainBIDInt *big.Int @@ -167,6 +167,8 @@ var _ = ginkgo.BeforeSuite(func() { chainAWSURI := httpToWebsocketURI(chainANodeURIs[0], blockchainIDA.String()) chainARPCURI = httpToRPCURI(chainANodeURIs[0], blockchainIDA.String()) log.Info("Creating ethclient for blockchainA", "wsURI", chainAWSURI, "rpcURL, chainARPCURI") + chainAWSClient, err = ethclient.Dial(chainAWSURI) + Expect(err).Should(BeNil()) chainARPCClient, err = ethclient.Dial(chainARPCURI) Expect(err).Should(BeNil()) @@ -185,98 +187,98 @@ var _ = ginkgo.BeforeSuite(func() { Expect(err).Should(BeNil()) log.Info("Finished setting up e2e test subnet variables") - log.Info("Deploying Teleporter contract to subnets") - // Read in the Teleporter contract information - teleporterContractAddress = common.HexToAddress(readHexTextFile("./tests/UniversalTeleporterMessengerContractAddress.txt")) - teleporterDeployerAddress := common.HexToAddress(readHexTextFile("./tests/UniversalTeleporterDeployerAddress.txt")) - teleporterDeployerTransaction := readHexTextFile("./tests/UniversalTeleporterDeployerTransaction.txt") - - nonceA, err := chainARPCClient.NonceAt(ctx, fundedAddress, nil) - Expect(err).Should(BeNil()) - - nonceB, err := chainBRPCClient.NonceAt(ctx, fundedAddress, nil) - Expect(err).Should(BeNil()) - - gasTipCapA, err := chainARPCClient.SuggestGasTipCap(context.Background()) - Expect(err).Should(BeNil()) - gasTipCapB, err := chainBRPCClient.SuggestGasTipCap(context.Background()) - Expect(err).Should(BeNil()) - - baseFeeA, err := chainARPCClient.EstimateBaseFee(context.Background()) - Expect(err).Should(BeNil()) - gasFeeCapA := baseFeeA.Mul(baseFeeA, big.NewInt(relayerEvm.BaseFeeFactor)) - gasFeeCapA.Add(gasFeeCapA, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) - - baseFeeB, err := chainBRPCClient.EstimateBaseFee(context.Background()) - Expect(err).Should(BeNil()) - gasFeeCapB := baseFeeB.Mul(baseFeeB, big.NewInt(relayerEvm.BaseFeeFactor)) - gasFeeCapB.Add(gasFeeCapB, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) - - // Fund the deployer address - { - value := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(10)) // 10eth - txA := types.NewTx(&types.DynamicFeeTx{ - ChainID: chainAIDInt, - Nonce: nonceA, - To: &teleporterDeployerAddress, - Gas: defaultTeleporterMessageGas, - GasFeeCap: gasFeeCapA, - GasTipCap: gasTipCapA, - Value: value, - }) - txSignerA := types.LatestSignerForChainID(chainAIDInt) - triggerTxA, err := types.SignTx(txA, txSignerA, fundedKey) - Expect(err).Should(BeNil()) - err = chainARPCClient.SendTransaction(ctx, triggerTxA) - Expect(err).Should(BeNil()) - time.Sleep(5 * time.Second) - receipt, err := chainARPCClient.TransactionReceipt(ctx, triggerTxA.Hash()) - Expect(err).Should(BeNil()) - Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) - } - { - value := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(10)) // 10eth - txB := types.NewTx(&types.DynamicFeeTx{ - ChainID: chainBIDInt, - Nonce: nonceB, - To: &teleporterDeployerAddress, - Gas: defaultTeleporterMessageGas, - GasFeeCap: gasFeeCapB, - GasTipCap: gasTipCapB, - Value: value, - }) - txSignerB := types.LatestSignerForChainID(chainBIDInt) - triggerTxB, err := types.SignTx(txB, txSignerB, fundedKey) - Expect(err).Should(BeNil()) - err = chainBRPCClient.SendTransaction(ctx, triggerTxB) - Expect(err).Should(BeNil()) - time.Sleep(5 * time.Second) - receipt, err := chainBRPCClient.TransactionReceipt(ctx, triggerTxB.Hash()) - Expect(err).Should(BeNil()) - Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) - } - // Deploy Teleporter on the two subnets - { - rpcClient, err := rpc.DialContext(ctx, chainARPCURI) - Expect(err).Should(BeNil()) - err = rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", teleporterDeployerTransaction) - Expect(err).Should(BeNil()) - time.Sleep(5 * time.Second) - teleporterCode, err := chainARPCClient.CodeAt(ctx, teleporterContractAddress, nil) - Expect(err).Should(BeNil()) - Expect(len(teleporterCode)).Should(BeNumerically(">", 2)) // 0x is an EOA, contract returns the bytecode - } - { - rpcClient, err := rpc.DialContext(ctx, chainBRPCURI) - Expect(err).Should(BeNil()) - err = rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", teleporterDeployerTransaction) - Expect(err).Should(BeNil()) - time.Sleep(5 * time.Second) - teleporterCode, err := chainBRPCClient.CodeAt(ctx, teleporterContractAddress, nil) - Expect(err).Should(BeNil()) - Expect(len(teleporterCode)).Should(BeNumerically(">", 2)) // 0x is an EOA, contract returns the bytecode - } - log.Info("Finished deploying Teleporter contracts") + // log.Info("Deploying Teleporter contract to subnets") + // // Read in the Teleporter contract information + // teleporterContractAddress = common.HexToAddress(readHexTextFile("./tests/UniversalTeleporterMessengerContractAddress.txt")) + // teleporterDeployerAddress := common.HexToAddress(readHexTextFile("./tests/UniversalTeleporterDeployerAddress.txt")) + // teleporterDeployerTransaction := readHexTextFile("./tests/UniversalTeleporterDeployerTransaction.txt") + + // nonceA, err := chainARPCClient.NonceAt(ctx, fundedAddress, nil) + // Expect(err).Should(BeNil()) + + // nonceB, err := chainBRPCClient.NonceAt(ctx, fundedAddress, nil) + // Expect(err).Should(BeNil()) + + // gasTipCapA, err := chainARPCClient.SuggestGasTipCap(context.Background()) + // Expect(err).Should(BeNil()) + // gasTipCapB, err := chainBRPCClient.SuggestGasTipCap(context.Background()) + // Expect(err).Should(BeNil()) + + // baseFeeA, err := chainARPCClient.EstimateBaseFee(context.Background()) + // Expect(err).Should(BeNil()) + // gasFeeCapA := baseFeeA.Mul(baseFeeA, big.NewInt(relayerEvm.BaseFeeFactor)) + // gasFeeCapA.Add(gasFeeCapA, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) + + // baseFeeB, err := chainBRPCClient.EstimateBaseFee(context.Background()) + // Expect(err).Should(BeNil()) + // gasFeeCapB := baseFeeB.Mul(baseFeeB, big.NewInt(relayerEvm.BaseFeeFactor)) + // gasFeeCapB.Add(gasFeeCapB, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) + + // // Fund the deployer address + // { + // value := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(10)) // 10eth + // txA := types.NewTx(&types.DynamicFeeTx{ + // ChainID: chainAIDInt, + // Nonce: nonceA, + // To: &teleporterDeployerAddress, + // Gas: defaultTeleporterMessageGas, + // GasFeeCap: gasFeeCapA, + // GasTipCap: gasTipCapA, + // Value: value, + // }) + // txSignerA := types.LatestSignerForChainID(chainAIDInt) + // triggerTxA, err := types.SignTx(txA, txSignerA, fundedKey) + // Expect(err).Should(BeNil()) + // err = chainARPCClient.SendTransaction(ctx, triggerTxA) + // Expect(err).Should(BeNil()) + // time.Sleep(5 * time.Second) + // receipt, err := chainARPCClient.TransactionReceipt(ctx, triggerTxA.Hash()) + // Expect(err).Should(BeNil()) + // Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) + // } + // { + // value := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(10)) // 10eth + // txB := types.NewTx(&types.DynamicFeeTx{ + // ChainID: chainBIDInt, + // Nonce: nonceB, + // To: &teleporterDeployerAddress, + // Gas: defaultTeleporterMessageGas, + // GasFeeCap: gasFeeCapB, + // GasTipCap: gasTipCapB, + // Value: value, + // }) + // txSignerB := types.LatestSignerForChainID(chainBIDInt) + // triggerTxB, err := types.SignTx(txB, txSignerB, fundedKey) + // Expect(err).Should(BeNil()) + // err = chainBRPCClient.SendTransaction(ctx, triggerTxB) + // Expect(err).Should(BeNil()) + // time.Sleep(5 * time.Second) + // receipt, err := chainBRPCClient.TransactionReceipt(ctx, triggerTxB.Hash()) + // Expect(err).Should(BeNil()) + // Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) + // } + // // Deploy Teleporter on the two subnets + // { + // rpcClient, err := rpc.DialContext(ctx, chainARPCURI) + // Expect(err).Should(BeNil()) + // err = rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", teleporterDeployerTransaction) + // Expect(err).Should(BeNil()) + // time.Sleep(5 * time.Second) + // teleporterCode, err := chainARPCClient.CodeAt(ctx, teleporterContractAddress, nil) + // Expect(err).Should(BeNil()) + // Expect(len(teleporterCode)).Should(BeNumerically(">", 2)) // 0x is an EOA, contract returns the bytecode + // } + // { + // rpcClient, err := rpc.DialContext(ctx, chainBRPCURI) + // Expect(err).Should(BeNil()) + // err = rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", teleporterDeployerTransaction) + // Expect(err).Should(BeNil()) + // time.Sleep(5 * time.Second) + // teleporterCode, err := chainBRPCClient.CodeAt(ctx, teleporterContractAddress, nil) + // Expect(err).Should(BeNil()) + // Expect(len(teleporterCode)).Should(BeNumerically(">", 2)) // 0x is an EOA, contract returns the bytecode + // } + // log.Info("Finished deploying Teleporter contracts") log.Info("Set up ginkgo before suite") }) @@ -295,15 +297,315 @@ var _ = ginkgo.AfterSuite(func() { // will then send a transaction to the source subnet to issue a Warp message simulting a transaction // sent from the Teleporter contract. The relayer will then wait for the transaction to be confirmed // on the destination subnet and verify that the Warp message was received and unpacked correctly. -var _ = ginkgo.Describe("[Relayer E2E]", ginkgo.Ordered, func() { +// var _ = ginkgo.Describe("[Relayer E2E]", ginkgo.Ordered, func() { +// var ( +// receivedWarpMessage *avalancheWarp.Message +// payload []byte +// relayerCmd *exec.Cmd +// relayerCancel context.CancelFunc +// ) + +// ginkgo.It("Set up relayer config", ginkgo.Label("Relayer", "Setup Relayer"), func() { +// hostA, portA, err := getURIHostAndPort(chainANodeURIs[0]) +// Expect(err).Should(BeNil()) + +// hostB, portB, err := getURIHostAndPort(chainBNodeURIs[0]) +// Expect(err).Should(BeNil()) + +// log.Info( +// "Setting up relayer config", +// "hostA", hostA, +// "portA", portA, +// "blockChainA", blockchainIDA.String(), +// "hostB", hostB, +// "portB", portB, +// "blockChainB", blockchainIDB.String(), +// "subnetA", subnetA.String(), +// "subnetB", subnetB.String(), +// ) + +// relayerConfig := config.Config{ +// LogLevel: logging.Info.LowerString(), +// NetworkID: peers.LocalNetworkID, +// PChainAPIURL: chainANodeURIs[0], +// EncryptConnection: false, +// StorageLocation: storageLocation, +// SourceSubnets: []config.SourceSubnet{ +// { +// SubnetID: subnetA.String(), +// ChainID: blockchainIDA.String(), +// VM: config.EVM.String(), +// EncryptConnection: false, +// APINodeHost: hostA, +// APINodePort: portA, +// MessageContracts: map[string]config.MessageProtocolConfig{ +// teleporterContractAddress.Hex(): { +// MessageFormat: config.TELEPORTER.String(), +// Settings: map[string]interface{}{ +// "reward-address": fundedAddress.Hex(), +// }, +// }, +// }, +// }, +// }, +// DestinationSubnets: []config.DestinationSubnet{ +// { +// SubnetID: subnetB.String(), +// ChainID: blockchainIDB.String(), +// VM: config.EVM.String(), +// EncryptConnection: false, +// APINodeHost: hostB, +// APINodePort: portB, +// AccountPrivateKey: fundedKeyStr, +// }, +// }, +// } + +// data, err := json.MarshalIndent(relayerConfig, "", "\t") +// Expect(err).Should(BeNil()) + +// f, err := os.CreateTemp(os.TempDir(), "relayer-config.json") +// Expect(err).Should(BeNil()) + +// _, err = f.Write(data) +// Expect(err).Should(BeNil()) +// relayerConfigPath = f.Name() + +// log.Info("Created awm-relayer config", "configPath", relayerConfigPath, "config", string(data)) +// }) + +// ginkgo.It("Build Relayer", ginkgo.Label("Relayer", "Build Relayer"), func() { +// // Build the awm-relayer binary +// cmd := exec.Command("./scripts/build.sh") +// out, err := cmd.CombinedOutput() +// fmt.Println(string(out)) +// Expect(err).Should(BeNil()) +// }) + +// // Send a transaction to Subnet A to issue a Warp Message from the Teleporter contract to Subnet B +// ginkgo.It("Send Message from A to B", ginkgo.Label("Warp", "SendWarp"), func() { +// ctx := context.Background() + +// relayerCmd, relayerCancel = runRelayerExecutable(ctx) + +// nonceA, err := chainARPCClient.NonceAt(ctx, fundedAddress, nil) +// Expect(err).Should(BeNil()) + +// nonceB, err := chainBRPCClient.NonceAt(ctx, fundedAddress, nil) +// Expect(err).Should(BeNil()) + +// log.Info("Packing teleporter message", "nonceA", nonceA, "nonceB", nonceB) +// payload, err = teleporter.PackSendCrossChainMessageEvent(common.Hash(blockchainIDB), teleporterMessage) +// Expect(err).Should(BeNil()) + +// data, err := teleporter.EVMTeleporterContractABI.Pack( +// "sendCrossChainMessage", +// TeleporterMessageInput{ +// DestinationChainID: blockchainIDB, +// DestinationAddress: fundedAddress, +// FeeInfo: FeeInfo{ +// ContractAddress: fundedAddress, +// Amount: big.NewInt(0), +// }, +// RequiredGasLimit: big.NewInt(1), +// AllowedRelayerAddresses: []common.Address{}, +// Message: []byte{1, 2, 3, 4}, +// }, +// ) +// Expect(err).Should(BeNil()) + +// // Send a transaction to the Teleporter contract +// tx := newTestTeleporterMessage(chainAIDInt, teleporterContractAddress, nonceA, data) + +// txSigner := types.LatestSignerForChainID(chainAIDInt) +// signedTx, err := types.SignTx(tx, txSigner, fundedKey) +// Expect(err).Should(BeNil()) + +// // Sleep for some time to make sure relayer has started up and subscribed. +// time.Sleep(15 * time.Second) +// log.Info("Subscribing to new heads on destination chain") + +// newHeadsB := make(chan *types.Header, 10) +// sub, err := chainBWSClient.SubscribeNewHead(ctx, newHeadsB) +// Expect(err).Should(BeNil()) +// defer sub.Unsubscribe() + +// log.Info("Sending sendWarpMessage transaction", "destinationChainID", blockchainIDB, "txHash", signedTx.Hash()) +// err = chainARPCClient.SendTransaction(ctx, signedTx) +// Expect(err).Should(BeNil()) + +// time.Sleep(5 * time.Second) +// receipt, err := chainARPCClient.TransactionReceipt(ctx, signedTx.Hash()) +// Expect(err).Should(BeNil()) +// Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) + +// // Get the latest block from Subnet B +// log.Info("Waiting for new block confirmation") +// newHead := <-newHeadsB +// log.Info("Received new head", "height", newHead.Number.Uint64()) +// blockHash := newHead.Hash() +// block, err := chainBRPCClient.BlockByHash(ctx, blockHash) +// Expect(err).Should(BeNil()) +// log.Info( +// "Got block", +// "blockHash", blockHash, +// "blockNumber", block.NumberU64(), +// "transactions", block.Transactions(), +// "numTransactions", len(block.Transactions()), +// "block", block, +// ) +// accessLists := block.Transactions()[0].AccessList() +// Expect(len(accessLists)).Should(Equal(1)) +// Expect(accessLists[0].Address).Should(Equal(warp.Module.Address)) + +// // Check the transaction storage key has warp message we're expecting +// storageKeyHashes := accessLists[0].StorageKeys +// packedPredicate := predicateutils.HashSliceToBytes(storageKeyHashes) +// predicateBytes, err := predicateutils.UnpackPredicate(packedPredicate) +// Expect(err).Should(BeNil()) +// receivedWarpMessage, err = avalancheWarp.ParseMessage(predicateBytes) +// Expect(err).Should(BeNil()) + +// // Check that the transaction has successful receipt status +// txHash := block.Transactions()[0].Hash() +// receipt, err = chainBRPCClient.TransactionReceipt(ctx, txHash) +// Expect(err).Should(BeNil()) +// Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) + +// log.Info("Finished sending warp message, closing down output channel") + +// // Cancel the command and stop the relayer +// relayerCancel() +// _ = relayerCmd.Wait() +// }) + +// ginkgo.It("Try relaying already delivered message", ginkgo.Label("Relayer", "RelayerAlreadyDeliveredMessage"), func() { +// ctx := context.Background() +// logger := logging.NewLogger( +// "awm-relayer", +// logging.NewWrappedCore( +// logging.Info, +// os.Stdout, +// logging.JSON.ConsoleEncoder(), +// ), +// ) +// jsonDB, err := database.NewJSONFileStorage(logger, storageLocation, []ids.ID{blockchainIDA, blockchainIDB}) +// Expect(err).Should(BeNil()) + +// // Modify the JSON database to force the relayer to re-process old blocks +// jsonDB.Put(blockchainIDA, []byte(database.LatestProcessedBlockKey), []byte("0")) +// jsonDB.Put(blockchainIDB, []byte(database.LatestProcessedBlockKey), []byte("0")) + +// // Subscribe to the destination chain block published +// newHeadsB := make(chan *types.Header, 10) +// sub, err := chainBWSClient.SubscribeNewHead(ctx, newHeadsB) +// Expect(err).Should(BeNil()) +// defer sub.Unsubscribe() + +// // Run the relayer +// relayerCmd, relayerCancel = runRelayerExecutable(ctx) + +// // We should not receive a new block on subnet B, since the relayer should have seen the Teleporter message was already delivered +// Consistently(newHeadsB, 10*time.Second, 500*time.Millisecond).ShouldNot(Receive()) + +// // Cancel the command and stop the relayer +// relayerCancel() +// _ = relayerCmd.Wait() +// }) + +// ginkgo.It("Validate Received Warp Message Values", ginkgo.Label("Relayer", "VerifyWarp"), func() { +// Expect(receivedWarpMessage.SourceChainID).Should(Equal(blockchainIDA)) +// addressedPayload, err := warpPayload.ParseAddressedPayload(receivedWarpMessage.Payload) +// Expect(err).Should(BeNil()) + +// receivedDestinationID, err := ids.ToID(addressedPayload.DestinationChainID.Bytes()) +// Expect(err).Should(BeNil()) +// Expect(receivedDestinationID).Should(Equal(blockchainIDB)) +// Expect(addressedPayload.DestinationAddress).Should(Equal(teleporterContractAddress)) +// Expect(addressedPayload.Payload).Should(Equal(payload)) + +// // Check that the teleporter message is correct +// receivedTeleporterMessage, err := teleporter.UnpackTeleporterMessage(addressedPayload.Payload) +// Expect(err).Should(BeNil()) +// Expect(*receivedTeleporterMessage).Should(Equal(teleporterMessage)) +// }) +// }) + +var _ = ginkgo.Describe("[Relayer Publish Block Hash]", ginkgo.Ordered, func() { var ( - receivedWarpMessage *avalancheWarp.Message - payload []byte - relayerCmd *exec.Cmd - relayerCancel context.CancelFunc + relayerCmd *exec.Cmd + relayerCancel context.CancelFunc + blockHashReceiverAddressB common.Address + subnetAHashes []common.Hash + blockHashABI *abi.ABI ) + ginkgo.It("Deploy block hash receiver", ginkgo.Label("Relayer", "DeployBlockHashReceiver"), func() { + ctx := context.Background() + blockHashReceiverByteCode := readHexTextFile("./tests/BlockHashReceiverByteCode.txt") + + nonceB, err := chainBRPCClient.NonceAt(ctx, fundedAddress, nil) + Expect(err).Should(BeNil()) + + // gasTipCapA, err := chainARPCClient.SuggestGasTipCap(context.Background()) + // Expect(err).Should(BeNil()) + + // baseFeeA, err := chainARPCClient.EstimateBaseFee(context.Background()) + // Expect(err).Should(BeNil()) + // gasFeeCapA := baseFeeA.Mul(baseFeeA, big.NewInt(relayerEvm.BaseFeeFactor)) + // gasFeeCapA.Add(gasFeeCapA, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) + + // contractAuth, err := bind.NewKeyedTransactorWithChainID(fundedKey, big.NewInt(peers.LocalNetworkID)) + // contractAuth.Nonce = big.NewInt(int64(nonceA)) + // contractAuth.GasFeeCap = gasFeeCapA + // contractAuth.GasTipCap = gasTipCapA + // contractAuth.GasLimit = 1000000 + + // Expect(err).Should(BeNil()) + // blockHashReceiverAddressB, _, _, err = bind.DeployContract( + // contractAuth, + // *blockHashABI, + // common.FromHex(blockHashReceiverByteCode), + // chainBRPCClient, + // ) + // Expect(err).Should(BeNil()) + blockHashABI, err = teleporter_block_hash.TeleporterBlockHashMetaData.GetAbi() + Expect(err).Should(BeNil()) + blockHashReceiverAddressB, err = deriveEVMContractAddress(fundedAddress, nonceB) + Expect(err).Should(BeNil()) + + cmdOutput := make(chan string) + cmd := exec.Command( + "cast", + "send", + "--rpc-url", chainBRPCURI, + "--private-key", hexutil.Encode(fundedKey.D.Bytes()), + "--create", blockHashReceiverByteCode, + ) + + // Set up a pipe to capture the command's output + cmdReader, _ := cmd.StdoutPipe() + + // Start a goroutine to read and output the command's stdout + go func() { + scanner := bufio.NewScanner(cmdReader) + for scanner.Scan() { + log.Info(scanner.Text()) + } + cmdOutput <- "Command execution finished" + }() + + err = cmd.Run() + Expect(err).Should(BeNil()) + + time.Sleep(5 * time.Second) + deployedCode, err := chainBRPCClient.CodeAt(ctx, blockHashReceiverAddressB, nil) + Expect(err).Should(BeNil()) + Expect(len(deployedCode)).Should(BeNumerically(">", 2)) // 0x is an EOA, contract returns the bytecode + + log.Info("Deployed block hash receiver contract", "address", blockHashReceiverAddressB.Hex()) + }) - ginkgo.It("Set up relayer config", ginkgo.Label("Relayer", "Setup Relayer"), func() { + ginkgo.It("Set up relayer config", ginkgo.Label("Relayer", "SetupRelayer"), func() { hostA, portA, err := getURIHostAndPort(chainANodeURIs[0]) Expect(err).Should(BeNil()) @@ -332,15 +634,25 @@ var _ = ginkgo.Describe("[Relayer E2E]", ginkgo.Ordered, func() { { SubnetID: subnetA.String(), ChainID: blockchainIDA.String(), - VM: config.EVM.String(), + VM: config.EVM_BLOCKHASH.String(), EncryptConnection: false, APINodeHost: hostA, APINodePort: portA, MessageContracts: map[string]config.MessageProtocolConfig{ - teleporterContractAddress.Hex(): { - MessageFormat: config.TELEPORTER.String(), + "0x0000000000000000000000000000000000000000": { + MessageFormat: config.BLOCK_HASH_PUBLISHER.String(), Settings: map[string]interface{}{ - "reward-address": fundedAddress.Hex(), + "destination-chains": []struct { + ChainID string `json:"chain-id"` + Address string `json:"address"` + Interval string `json:"interval"` + }{ + { + ChainID: blockchainIDB.String(), + Address: blockHashReceiverAddressB.String(), + Interval: "5", + }, + }, }, }, }, @@ -372,7 +684,7 @@ var _ = ginkgo.Describe("[Relayer E2E]", ginkgo.Ordered, func() { log.Info("Created awm-relayer config", "configPath", relayerConfigPath, "config", string(data)) }) - ginkgo.It("Build Relayer", ginkgo.Label("Relayer", "Build Relayer"), func() { + ginkgo.It("Build Relayer", ginkgo.Label("Relayer", "BuildRelayer"), func() { // Build the awm-relayer binary cmd := exec.Command("./scripts/build.sh") out, err := cmd.CombinedOutput() @@ -381,150 +693,146 @@ var _ = ginkgo.Describe("[Relayer E2E]", ginkgo.Ordered, func() { }) // Send a transaction to Subnet A to issue a Warp Message from the Teleporter contract to Subnet B - ginkgo.It("Send Message from A to B", ginkgo.Label("Warp", "SendWarp"), func() { + ginkgo.It("Publish block hash", ginkgo.Label("Relayer", "PublishHash"), func() { ctx := context.Background() relayerCmd, relayerCancel = runRelayerExecutable(ctx) - nonceA, err := chainARPCClient.NonceAt(ctx, fundedAddress, nil) Expect(err).Should(BeNil()) - nonceB, err := chainBRPCClient.NonceAt(ctx, fundedAddress, nil) - Expect(err).Should(BeNil()) - - log.Info("Packing teleporter message", "nonceA", nonceA, "nonceB", nonceB) - payload, err = teleporter.PackSendCrossChainMessageEvent(common.Hash(blockchainIDB), teleporterMessage) - Expect(err).Should(BeNil()) - - data, err := teleporter.EVMTeleporterContractABI.Pack( - "sendCrossChainMessage", - TeleporterMessageInput{ - DestinationChainID: blockchainIDB, - DestinationAddress: fundedAddress, - FeeInfo: FeeInfo{ - ContractAddress: fundedAddress, - Amount: big.NewInt(0), - }, - RequiredGasLimit: big.NewInt(1), - AllowedRelayerAddresses: []common.Address{}, - Message: []byte{1, 2, 3, 4}, - }, - ) - Expect(err).Should(BeNil()) - - // Send a transaction to the Teleporter contract - tx := newTestTeleporterMessage(chainAIDInt, teleporterContractAddress, nonceA, data) - - txSigner := types.LatestSignerForChainID(chainAIDInt) - signedTx, err := types.SignTx(tx, txSigner, fundedKey) - Expect(err).Should(BeNil()) - - // Sleep for some time to make sure relayer has started up and subscribed. - time.Sleep(15 * time.Second) - log.Info("Subscribing to new heads on destination chain") - - newHeadsB := make(chan *types.Header, 10) - sub, err := chainBWSClient.SubscribeNewHead(ctx, newHeadsB) - Expect(err).Should(BeNil()) - defer sub.Unsubscribe() - - log.Info("Sending sendWarpMessage transaction", "destinationChainID", blockchainIDB, "txHash", signedTx.Hash()) - err = chainARPCClient.SendTransaction(ctx, signedTx) - Expect(err).Should(BeNil()) - - time.Sleep(5 * time.Second) - receipt, err := chainARPCClient.TransactionReceipt(ctx, signedTx.Hash()) - Expect(err).Should(BeNil()) - Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) - - // Get the latest block from Subnet B - log.Info("Waiting for new block confirmation") - newHead := <-newHeadsB - log.Info("Received new head", "height", newHead.Number.Uint64()) - blockHash := newHead.Hash() - block, err := chainBRPCClient.BlockByHash(ctx, blockHash) - Expect(err).Should(BeNil()) - log.Info( - "Got block", - "blockHash", blockHash, - "blockNumber", block.NumberU64(), - "transactions", block.Transactions(), - "numTransactions", len(block.Transactions()), - "block", block, - ) - accessLists := block.Transactions()[0].AccessList() - Expect(len(accessLists)).Should(Equal(1)) - Expect(accessLists[0].Address).Should(Equal(warp.Module.Address)) - - // Check the transaction storage key has warp message we're expecting - storageKeyHashes := accessLists[0].StorageKeys - packedPredicate := predicateutils.HashSliceToBytes(storageKeyHashes) - predicateBytes, err := predicateutils.UnpackPredicate(packedPredicate) - Expect(err).Should(BeNil()) - receivedWarpMessage, err = avalancheWarp.ParseMessage(predicateBytes) + destinationAddress := common.HexToAddress("0x0000000000000000000000000000000000000000") + gasTipCapA, err := chainARPCClient.SuggestGasTipCap(context.Background()) Expect(err).Should(BeNil()) - // Check that the transaction has successful receipt status - txHash := block.Transactions()[0].Hash() - receipt, err = chainBRPCClient.TransactionReceipt(ctx, txHash) + baseFeeA, err := chainARPCClient.EstimateBaseFee(context.Background()) Expect(err).Should(BeNil()) - Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) - - log.Info("Finished sending warp message, closing down output channel") - - // Cancel the command and stop the relayer - relayerCancel() - _ = relayerCmd.Wait() - }) + gasFeeCapA := baseFeeA.Mul(baseFeeA, big.NewInt(relayerEvm.BaseFeeFactor)) + gasFeeCapA.Add(gasFeeCapA, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) - ginkgo.It("Try relaying already delivered message", ginkgo.Label("Relayer", "RelayerAlreadyDeliveredMessage"), func() { - ctx := context.Background() - logger := logging.NewLogger( - "awm-relayer", - logging.NewWrappedCore( - logging.Info, - os.Stdout, - logging.JSON.ConsoleEncoder(), - ), - ) - jsonDB, err := database.NewJSONFileStorage(logger, storageLocation, []ids.ID{blockchainIDA, blockchainIDB}) + // Subscribe to the destination chain block published + newHeadsA := make(chan *types.Header, 10) + subA, err := chainAWSClient.SubscribeNewHead(ctx, newHeadsA) Expect(err).Should(BeNil()) - - // Modify the JSON database to force the relayer to re-process old blocks - jsonDB.Put(blockchainIDA, []byte(database.LatestProcessedBlockKey), []byte("0")) - jsonDB.Put(blockchainIDB, []byte(database.LatestProcessedBlockKey), []byte("0")) + defer subA.Unsubscribe() // Subscribe to the destination chain block published newHeadsB := make(chan *types.Header, 10) - sub, err := chainBWSClient.SubscribeNewHead(ctx, newHeadsB) - Expect(err).Should(BeNil()) - defer sub.Unsubscribe() + subB, err := chainBWSClient.SubscribeNewHead(ctx, newHeadsB) + Expect(err).Should(BeNil()) + defer subB.Unsubscribe() + + // TODONOW: not necessarily true, since the block height might be < 5 + // Send 6 transactions to produce 6 blocks on subnet A + // We expect the first and the sixth txs to be published by the relayer, since the relayer + // will be initialized with the "latest published block" to be height 0 + + for i := 0; i < 6; i++ { + value := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(1)) // 1eth + txA := types.NewTx(&types.DynamicFeeTx{ + ChainID: chainAIDInt, + Nonce: nonceA + uint64(i), + To: &destinationAddress, + Gas: defaultTeleporterMessageGas, + GasFeeCap: gasFeeCapA, + GasTipCap: gasTipCapA, + Value: value, + }) + txSignerA := types.LatestSignerForChainID(chainAIDInt) + triggerTxA, err := types.SignTx(txA, txSignerA, fundedKey) + Expect(err).Should(BeNil()) + err = chainARPCClient.SendTransaction(ctx, triggerTxA) + Expect(err).Should(BeNil()) + + log.Info("Waiting for new block confirmation", "block", i) + newHeadA := <-newHeadsA + subnetAHashes = append(subnetAHashes, newHeadA.Hash()) + } - // Run the relayer - relayerCmd, relayerCancel = runRelayerExecutable(ctx) + time.Sleep(5 * time.Second) - // We should not receive a new block on subnet B, since the relayer should have seen the Teleporter message was already delivered - Consistently(newHeadsB, 10*time.Second, 500*time.Millisecond).ShouldNot(Receive()) + for { + newHeadB := <-newHeadsB + log.Info("Fetching log from the newly produced block") + + blockHashB := newHeadB.Hash() + + block, err := chainBRPCClient.BlockByHash(ctx, blockHashB) + Expect(err).Should(BeNil()) + txs := block.Transactions() + log.Info(fmt.Sprintf("numTxs: %d", len(txs))) + for _, tx := range txs { + log.Info(fmt.Sprintf("txHash: %s", tx.Hash().String())) + log.Info(fmt.Sprintf("to: %s", tx.To().String())) + log.Info(fmt.Sprintf("data: %s", hex.EncodeToString(tx.Data()))) + receipt, err := chainBRPCClient.TransactionReceipt(ctx, tx.Hash()) + Expect(err).Should(BeNil()) + Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) + } + + // receipt, err := chainBRPCClient.TransactionReceipt(ctx, blockHashB) + // Expect(err).Should(BeNil()) + // Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) + + log.Info("event", blockHashABI.Events["ReceiveBlockHash"].ID.String()) //TODONOW remove this + logs, err := chainBRPCClient.FilterLogs(ctx, interfaces.FilterQuery{ + // BlockHash: &blockHashB, + Addresses: []common.Address{blockHashReceiverAddressB}, + // Topics: [][]common.Hash{ + // { + // blockHashABI.Events["ReceiveBlockHash"].ID, + // }, + // }, + }) + Expect(err).Should(BeNil()) + log.Info("Logs", "logs", logs) + } + // // Wait for new blocks on B. We are expecting 2 new blocks + // { + // newHeadB := <-newHeadsB + // blockHashB := newHeadB.Hash() + + // log.Info("Fetching relevant warp logs from the newly produced block") + // logs, err := chainBRPCClient.FilterLogs(ctx, interfaces.FilterQuery{ + // BlockHash: &blockHashB, + // // Addresses: []common.Address{blockHashReceiverAddressB}, + // // Topics: [][]common.Hash{ + // // { + // // blockHashABI.Events["ReceiveBlockHash"].ID, + // // }, + // // }, + // }) + // log.Info("Logs", "logs", logs) + // Expect(err).Should(BeNil()) + // Expect(len(logs)).Should(Equal(1)) + + // Expect(logs[0].Topics[2]).Should(Equal(subnetAHashes[0])) + // } + // { + // newHeadB := <-newHeadsB + // blockHashB := newHeadB.Hash() + + // log.Info("Fetching relevant warp logs from the newly produced block") + // logs, err := chainBRPCClient.FilterLogs(ctx, interfaces.FilterQuery{ + // BlockHash: &blockHashB, + // Addresses: []common.Address{blockHashReceiverAddressB}, + // Topics: [][]common.Hash{ + // { + // blockHashABI.Events["ReceiveBlockHash"].ID, + // }, + // }, + // }) + // Expect(err).Should(BeNil()) + // Expect(len(logs)).Should(Equal(1)) + + // Expect(logs[0].Topics[2]).Should(Equal(subnetAHashes[5])) + // } // Cancel the command and stop the relayer relayerCancel() _ = relayerCmd.Wait() }) - ginkgo.It("Validate Received Warp Message Values", ginkgo.Label("Relayer", "VerifyWarp"), func() { - Expect(receivedWarpMessage.SourceChainID).Should(Equal(blockchainIDA)) - addressedPayload, err := warpPayload.ParseAddressedPayload(receivedWarpMessage.Payload) - Expect(err).Should(BeNil()) + ginkgo.It("Verify received block hash", ginkgo.Label("Relayer", "VerifyReceivedHash"), func() { - receivedDestinationID, err := ids.ToID(addressedPayload.DestinationChainID.Bytes()) - Expect(err).Should(BeNil()) - Expect(receivedDestinationID).Should(Equal(blockchainIDB)) - Expect(addressedPayload.DestinationAddress).Should(Equal(teleporterContractAddress)) - Expect(addressedPayload.Payload).Should(Equal(payload)) - - // Check that the teleporter message is correct - receivedTeleporterMessage, err := teleporter.UnpackTeleporterMessage(addressedPayload.Payload) - Expect(err).Should(BeNil()) - Expect(*receivedTeleporterMessage).Should(Equal(teleporterMessage)) }) }) diff --git a/tests/utils.go b/tests/utils.go index cd60ab3b..d1241c6c 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -21,8 +21,11 @@ import ( "github.com/ava-labs/subnet-evm/tests/utils" "github.com/ava-labs/subnet-evm/tests/utils/runner" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" . "github.com/onsi/gomega" + "github.com/pkg/errors" ) var ( @@ -140,3 +143,18 @@ func setUpProposerVm(ctx context.Context, fundedKey *ecdsa.PrivateKey, manager * err = utils.IssueTxsToActivateProposerVMFork(ctx, chainIDInt, fundedKey, client) Expect(err).Should(BeNil()) } + +// TODONOW: remove this once available in teleporter +func deriveEVMContractAddress(sender common.Address, nonce uint64) (common.Address, error) { + type AddressNonce struct { + Address common.Address + Nonce uint64 + } + addressNonce := AddressNonce{sender, nonce} + rlpEncoded, err := rlp.EncodeToBytes(addressNonce) + if err != nil { + return common.Address{}, errors.Wrap(err, "Failed to RLP encode address and nonce value.") + } + hash := crypto.Keccak256Hash(rlpEncoded) + return common.HexToAddress(fmt.Sprintf("0x%x", hash.Bytes()[12:])), nil +} diff --git a/vms/evm_block_hash/subscriber.go b/vms/evm_block_hash/subscriber.go index 96dff49e..328999b7 100644 --- a/vms/evm_block_hash/subscriber.go +++ b/vms/evm_block_hash/subscriber.go @@ -2,6 +2,7 @@ package evm_block_hash import ( "context" + "encoding/json" "errors" "fmt" "math/big" @@ -34,13 +35,14 @@ var ( // subscriber implements Subscriber type subscriber struct { - nodeWSURL string - nodeRPCURL string - chainID ids.ID - logsChan chan vmtypes.WarpMessageInfo - blocks <-chan *types.Header - sub interfaces.Subscription - networkID uint32 + nodeWSURL string + nodeRPCURL string + chainID ids.ID + logsChan chan vmtypes.WarpMessageInfo + blocks <-chan *types.Header + sub interfaces.Subscription + networkID uint32 + destinationChainIDs []ids.ID logger logging.Logger db database.RelayerDatabase @@ -59,14 +61,54 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat logs := make(chan vmtypes.WarpMessageInfo, maxClientSubscriptionBuffer) + var destinationChainIDs []ids.ID + // Extract the destination chain IDs from the EVM Block Hash config + for _, cfg := range subnetInfo.MessageContracts { + if config.ParseMessageProtocol(cfg.MessageFormat) == config.BLOCK_HASH_PUBLISHER { + // Marshal the map and unmarshal into the Block Hash Publisher config + data, err := json.Marshal(cfg.Settings) + if err != nil { + logger.Error("Failed to marshal Block Hash Publisher config") + return nil + } + // TODONOW: Don't repeat the block hash config here. Need to resolve dependency cycle + type destinationInfo struct { + ChainID string `json:"chain-id"` + Address string `json:"address"` + Interval string `json:"interval"` + } + + type config struct { + DestinationChains []destinationInfo `json:"destination-chains"` + } + var messageConfig config + if err := json.Unmarshal(data, &messageConfig); err != nil { + logger.Error("Failed to unmarshal Block Hash Publisher config") + return nil + } + for _, chainInfo := range messageConfig.DestinationChains { + chainID, err := ids.FromString(chainInfo.ChainID) + if err != nil { + logger.Error( + "Failed to decode base-58 encoded destination chain ID", + zap.Error(err), + ) + return nil + } + destinationChainIDs = append(destinationChainIDs, chainID) + } + } + } + return &subscriber{ - nodeWSURL: subnetInfo.GetNodeWSEndpoint(), - nodeRPCURL: subnetInfo.GetNodeRPCEndpoint(), - chainID: chainID, - logger: logger, - db: db, - logsChan: logs, - networkID: config.GetNetworkID(), + nodeWSURL: subnetInfo.GetNodeWSEndpoint(), + nodeRPCURL: subnetInfo.GetNodeRPCEndpoint(), + chainID: chainID, + logger: logger, + db: db, + logsChan: logs, + networkID: config.GetNetworkID(), + destinationChainIDs: destinationChainIDs, } } @@ -128,7 +170,7 @@ func (s *subscriber) dialAndSubscribe() error { return nil } -func (s *subscriber) NewWarpMessageInfo(block *types.Header) (*vmtypes.WarpMessageInfo, error) { +func (s *subscriber) NewWarpMessageInfo(block *types.Header, destinationChainID ids.ID) (*vmtypes.WarpMessageInfo, error) { blockHashPayload, err := payload.NewBlockHashPayload(block.Hash()) if err != nil { return nil, err @@ -143,24 +185,30 @@ func (s *subscriber) NewWarpMessageInfo(block *types.Header) (*vmtypes.WarpMessa } return &vmtypes.WarpMessageInfo{ - UnsignedMsgBytes: unsignedMessage.Bytes(), - BlockNumber: block.Number.Uint64(), - BlockTimestamp: block.Time, + DestinationChainID: destinationChainID, + UnsignedMsgBytes: unsignedMessage.Bytes(), + BlockNumber: block.Number.Uint64(), + BlockTimestamp: block.Time, }, nil } // forward logs from the concrete log channel to the interface channel func (s *subscriber) forwardLogs() { + s.logger.Info("DEBUG Forwarding logs") for block := range s.blocks { - messageInfo, err := s.NewWarpMessageInfo(block) - if err != nil { - s.logger.Error( - "Invalid log. Continuing.", - zap.Error(err), - ) - continue + // Fan out to all destination chains + for _, destinationChainID := range s.destinationChainIDs { + s.logger.Info("DEBUG forwarding to destination", zap.String("destinationChainID", destinationChainID.String())) + messageInfo, err := s.NewWarpMessageInfo(block, destinationChainID) + if err != nil { + s.logger.Error( + "Invalid log. Continuing.", + zap.Error(err), + ) + continue + } + s.logsChan <- *messageInfo } - s.logsChan <- *messageInfo } }