From 2eea50383951496e37842c9b1b6fb8f57f5e8f21 Mon Sep 17 00:00:00 2001 From: moshe-blox <89339422+moshe-blox@users.noreply.github.com> Date: Tue, 23 Jul 2024 14:39:35 +0300 Subject: [PATCH] Custom DomainType and Trusted Peers (#1493) * enable custom domain support * update domain type while loading shares * print self address * new address format * fix unit test * parse trusted peers in p2p.New --------- Co-authored-by: Lior Rutenberg Co-authored-by: rehs0y --- cli/operator/node.go | 24 ++++++++- network/discovery/dv5_service.go | 8 +-- network/discovery/dv5_service_test.go | 4 +- network/p2p/config.go | 7 +-- network/p2p/p2p.go | 78 ++++++++++++++++++++++----- network/p2p/test_utils.go | 5 +- operator/node.go | 1 + registry/storage/shares.go | 1 + 8 files changed, 105 insertions(+), 23 deletions(-) diff --git a/cli/operator/node.go b/cli/operator/node.go index 0b877ea83b..71f18c4ca1 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -3,11 +3,13 @@ package operator import ( "context" "encoding/base64" + "encoding/hex" "fmt" "log" "math/big" "net/http" "os" + "strings" "time" "github.com/bloxapp/ssv/operator/keystore" @@ -540,6 +542,22 @@ func setupSSVNetwork(logger *zap.Logger) (networkconfig.NetworkConfig, error) { return networkconfig.NetworkConfig{}, err } + // Overwrite DomainType if CustomTypeDomain is set. + if cfg.SSVOptions.CustomDomainType != "" { + if !strings.HasPrefix(cfg.SSVOptions.CustomDomainType, "0x") { + return networkconfig.NetworkConfig{}, errors.New("custom domain type must be a hex string") + } + byts, err := hex.DecodeString(cfg.SSVOptions.CustomDomainType[2:]) + if err != nil { + return networkconfig.NetworkConfig{}, errors.Wrap(err, "failed to decode custom domain type") + } + if len(byts) != 4 { + return networkconfig.NetworkConfig{}, errors.New("custom domain type must be 4 bytes") + } + networkConfig.Domain = spectypes.DomainType(byts) + logger.Info("running with custom domain type", fields.Domain(networkConfig.Domain)) + } + types.SetDefaultDomain(networkConfig.Domain) nodeType := "light" @@ -572,7 +590,11 @@ func setupP2P(logger *zap.Logger, db basedb.Database, mr metricsreporter.Metrics } cfg.P2pNetworkConfig.NetworkPrivateKey = netPrivKey - return p2pv1.New(logger, &cfg.P2pNetworkConfig, mr) + p2pNetwork, err := p2pv1.New(logger, &cfg.P2pNetworkConfig, mr) + if err != nil { + logger.Fatal("failed to setup p2p network", zap.Error(err)) + } + return p2pNetwork } func setupConsensusClient( diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index 6842015d9b..81dc6f7f4f 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -148,12 +148,12 @@ var zeroSubnets, _ = records.Subnets{}.FromString(records.ZeroSubnets) func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error { // Get the peer's domain type, skipping if it mismatches ours. - // TODO: uncomment errors once there are sufficient nodes with domain type. nodeDomainType, err := records.GetDomainTypeEntry(e.Node.Record()) if err != nil { - // TODO: skip missing domain type (likely old node). - } else if nodeDomainType != dvs.domainType { - // TODO: skip different domain type. + return fmt.Errorf("could not read domain type: %w", err) + } + if nodeDomainType != dvs.domainType { + return fmt.Errorf("mismatched domain type: %x", nodeDomainType) } // Get the peer's subnets, skipping if it has none. diff --git a/network/discovery/dv5_service_test.go b/network/discovery/dv5_service_test.go index 7e0ed5c3ba..90293b7c02 100644 --- a/network/discovery/dv5_service_test.go +++ b/network/discovery/dv5_service_test.go @@ -35,13 +35,13 @@ func TestCheckPeer(t *testing.T) { name: "missing domain type", domainType: nil, subnets: mySubnets, - expectedError: nil, + expectedError: errors.New("could not read domain type: not found"), }, { name: "different domain type", domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5}, subnets: mySubnets, - expectedError: nil, + expectedError: errors.New("mismatched domain type: 01020305"), }, { name: "missing subnets", diff --git a/network/p2p/config.go b/network/p2p/config.go index f9355f87fe..60307b5f51 100644 --- a/network/p2p/config.go +++ b/network/p2p/config.go @@ -34,9 +34,10 @@ const ( // Config holds the configuration options for p2p network type Config struct { - Ctx context.Context - Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:""` - Discovery string `yaml:"Discovery" env:"P2P_DISCOVERY" env-description:"Discovery system to use" env-default:"discv5"` + Ctx context.Context + Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:""` + Discovery string `yaml:"Discovery" env:"P2P_DISCOVERY" env-description:"Discovery system to use" env-default:"discv5"` + TrustedPeers []string `yaml:"TrustedPeers" env:"TRUSTED_PEERS" env-default:"" env-description:"List of peers to connect to."` TCPPort int `yaml:"TcpPort" env:"TCP_PORT" env-default:"13001" env-description:"TCP port for p2p transport"` UDPPort int `yaml:"UdpPort" env:"UDP_PORT" env-default:"12001" env-description:"UDP port for discovery"` diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 683ebf7e40..164c07261f 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -2,6 +2,8 @@ package p2pv1 import ( "context" + "fmt" + "strings" "sync/atomic" "time" @@ -11,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" libp2pdiscbackoff "github.com/libp2p/go-libp2p/p2p/discovery/backoff" + ma "github.com/multiformats/go-multiaddr" "go.uber.org/zap" "github.com/bloxapp/ssv/logging" @@ -66,6 +69,7 @@ type p2pNetwork struct { msgValidator validation.MessageValidator connHandler connections.ConnHandler connGater connmgr.ConnectionGater + trustedPeers []*peer.AddrInfo metrics Metrics state int32 @@ -83,12 +87,12 @@ type p2pNetwork struct { } // New creates a new p2p network -func New(logger *zap.Logger, cfg *Config, mr Metrics) network.P2PNetwork { +func New(logger *zap.Logger, cfg *Config, mr Metrics) (network.P2PNetwork, error) { ctx, cancel := context.WithCancel(cfg.Ctx) logger = logger.Named(logging.NameP2PNetwork) - return &p2pNetwork{ + n := &p2pNetwork{ parentCtx: cfg.Ctx, ctx: ctx, cancel: cancel, @@ -104,6 +108,30 @@ func New(logger *zap.Logger, cfg *Config, mr Metrics) network.P2PNetwork { operatorDataStore: cfg.OperatorDataStore, metrics: mr, } + if err := n.parseTrustedPeers(); err != nil { + return nil, err + } + return n, nil +} + +func (n *p2pNetwork) parseTrustedPeers() error { + // Group addresses by peer ID. + trustedPeers := map[peer.ID][]ma.Multiaddr{} + if len(n.cfg.TrustedPeers) > 0 { + for _, mas := range n.cfg.TrustedPeers { + for _, ma := range strings.Split(mas, ",") { + addrInfo, err := peer.AddrInfoFromString(ma) + if err != nil { + return fmt.Errorf("could not parse trusted peer: %w", err) + } + trustedPeers[addrInfo.ID] = append(trustedPeers[addrInfo.ID], addrInfo.Addrs...) + } + } + } + for id, addrs := range trustedPeers { + n.trustedPeers = append(n.trustedPeers, &peer.AddrInfo{ID: id, Addrs: addrs}) + } + return nil } // Host implements HostProvider @@ -164,9 +192,41 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error { return nil } - logger.Info("starting") + connector := make(chan peer.AddrInfo, connectorQueueSize) + go func() { + // Wait for own subnets to be subscribed to and updated. + // TODO: wait more intelligently with a channel. + time.Sleep(8 * time.Second) - go n.startDiscovery(logger) + ctx, cancel := context.WithCancel(n.ctx) + defer cancel() + n.backoffConnector.Connect(ctx, connector) + }() + + // Connect to trusted peers first. + go func() { + for _, addrInfo := range n.trustedPeers { + connector <- *addrInfo + } + }() + + ma, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ + ID: n.host.ID(), + Addrs: n.host.Addrs(), + }) + if err != nil { + logger.Fatal("could not get my address", zap.Error(err)) + } + maStrs := make([]string, len(ma)) + for i, ima := range ma { + maStrs[i] = ima.String() + } + logger.Info("starting p2p", + zap.String("my_address", strings.Join(maStrs, ",")), + zap.Int("trusted_peers", len(n.trustedPeers)), + ) + + go n.startDiscovery(logger, connector) async.Interval(n.ctx, connManagerGCInterval, n.peersBalancing(logger)) // don't report metrics in tests @@ -206,20 +266,14 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { // startDiscovery starts the required services // it will try to bootstrap discovery service, and inject a connect function. // the connect function checks if we can connect to the given peer and if so passing it to the backoff connector. -func (n *p2pNetwork) startDiscovery(logger *zap.Logger) { - discoveredPeers := make(chan peer.AddrInfo, connectorQueueSize) - go func() { - ctx, cancel := context.WithCancel(n.ctx) - defer cancel() - n.backoffConnector.Connect(ctx, discoveredPeers) - }() +func (n *p2pNetwork) startDiscovery(logger *zap.Logger, connector chan peer.AddrInfo) { err := tasks.Retry(func() error { return n.disc.Bootstrap(logger, func(e discovery.PeerEvent) { if !n.idx.CanConnect(e.AddrInfo.ID) { return } select { - case discoveredPeers <- e.AddrInfo: + case connector <- e.AddrInfo: default: logger.Warn("connector queue is full, skipping new peer", fields.PeerID(e.AddrInfo.ID)) } diff --git a/network/p2p/test_utils.go b/network/p2p/test_utils.go index 31e763a378..475af28c2c 100644 --- a/network/p2p/test_utils.go +++ b/network/p2p/test_utils.go @@ -182,7 +182,10 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex int, keys t cfg.OperatorDataStore = operatordatastore.New(®istrystorage.OperatorData{ID: spectypes.OperatorID(nodeIndex + 1)}) mr := metricsreporter.New() - p := New(logger, cfg, mr) + p, err := New(logger, cfg, mr) + if err != nil { + return nil, err + } err = p.Setup(logger) if err != nil { return nil, err diff --git a/operator/node.go b/operator/node.go index 89e7b9a758..16f965c413 100644 --- a/operator/node.go +++ b/operator/node.go @@ -34,6 +34,7 @@ type Node interface { type Options struct { // NetworkName is the network name of this node NetworkName string `yaml:"Network" env:"NETWORK" env-default:"mainnet" env-description:"Network is the network of this node"` + CustomDomainType string `yaml:"CustomDomainType" env:"CUSTOM_DOMAIN_TYPE" env-default:"" env-description:"Override the SSV domain type. This is used to isolate the node from the rest of the network. Do not set unless you know what you are doing. Example: 0x01020304"` Network networkconfig.NetworkConfig BeaconNode beaconprotocol.BeaconNode // TODO: consider renaming to ConsensusClient ExecutionClient *executionclient.ExecutionClient diff --git a/registry/storage/shares.go b/registry/storage/shares.go index 41431ce05f..b18f6c36a8 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -82,6 +82,7 @@ func (s *sharesStorage) load() error { if err := val.Decode(obj.Value); err != nil { return fmt.Errorf("failed to deserialize share: %w", err) } + val.DomainType = types.GetDefaultDomain() s.shares[hex.EncodeToString(val.ValidatorPubKey)] = val return nil })