Skip to content

Commit

Permalink
Custom DomainType and Trusted Peers (#1493)
Browse files Browse the repository at this point in the history
* enable custom domain support

* update domain type while loading shares

* print self address

* new address format

* fix unit test

* parse trusted peers in p2p.New

---------

Co-authored-by: Lior Rutenberg <[email protected]>
Co-authored-by: rehs0y <[email protected]>
  • Loading branch information
3 people authored Jul 23, 2024
1 parent 4068234 commit 2eea503
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 23 deletions.
24 changes: 23 additions & 1 deletion cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package operator
import (
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"log"
"math/big"
"net/http"
"os"
"strings"
"time"

"github.com/bloxapp/ssv/operator/keystore"
Expand Down Expand Up @@ -540,6 +542,22 @@ func setupSSVNetwork(logger *zap.Logger) (networkconfig.NetworkConfig, error) {
return networkconfig.NetworkConfig{}, err
}

// Overwrite DomainType if CustomTypeDomain is set.
if cfg.SSVOptions.CustomDomainType != "" {
if !strings.HasPrefix(cfg.SSVOptions.CustomDomainType, "0x") {
return networkconfig.NetworkConfig{}, errors.New("custom domain type must be a hex string")
}
byts, err := hex.DecodeString(cfg.SSVOptions.CustomDomainType[2:])
if err != nil {
return networkconfig.NetworkConfig{}, errors.Wrap(err, "failed to decode custom domain type")
}
if len(byts) != 4 {
return networkconfig.NetworkConfig{}, errors.New("custom domain type must be 4 bytes")
}
networkConfig.Domain = spectypes.DomainType(byts)
logger.Info("running with custom domain type", fields.Domain(networkConfig.Domain))
}

types.SetDefaultDomain(networkConfig.Domain)

nodeType := "light"
Expand Down Expand Up @@ -572,7 +590,11 @@ func setupP2P(logger *zap.Logger, db basedb.Database, mr metricsreporter.Metrics
}
cfg.P2pNetworkConfig.NetworkPrivateKey = netPrivKey

return p2pv1.New(logger, &cfg.P2pNetworkConfig, mr)
p2pNetwork, err := p2pv1.New(logger, &cfg.P2pNetworkConfig, mr)
if err != nil {
logger.Fatal("failed to setup p2p network", zap.Error(err))
}
return p2pNetwork
}

func setupConsensusClient(
Expand Down
8 changes: 4 additions & 4 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ var zeroSubnets, _ = records.Subnets{}.FromString(records.ZeroSubnets)

func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
// Get the peer's domain type, skipping if it mismatches ours.
// TODO: uncomment errors once there are sufficient nodes with domain type.
nodeDomainType, err := records.GetDomainTypeEntry(e.Node.Record())
if err != nil {
// TODO: skip missing domain type (likely old node).
} else if nodeDomainType != dvs.domainType {
// TODO: skip different domain type.
return fmt.Errorf("could not read domain type: %w", err)
}
if nodeDomainType != dvs.domainType {
return fmt.Errorf("mismatched domain type: %x", nodeDomainType)
}

// Get the peer's subnets, skipping if it has none.
Expand Down
4 changes: 2 additions & 2 deletions network/discovery/dv5_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func TestCheckPeer(t *testing.T) {
name: "missing domain type",
domainType: nil,
subnets: mySubnets,
expectedError: nil,
expectedError: errors.New("could not read domain type: not found"),
},
{
name: "different domain type",
domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5},
subnets: mySubnets,
expectedError: nil,
expectedError: errors.New("mismatched domain type: 01020305"),
},
{
name: "missing subnets",
Expand Down
7 changes: 4 additions & 3 deletions network/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ const (

// Config holds the configuration options for p2p network
type Config struct {
Ctx context.Context
Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:""`
Discovery string `yaml:"Discovery" env:"P2P_DISCOVERY" env-description:"Discovery system to use" env-default:"discv5"`
Ctx context.Context
Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:""`
Discovery string `yaml:"Discovery" env:"P2P_DISCOVERY" env-description:"Discovery system to use" env-default:"discv5"`
TrustedPeers []string `yaml:"TrustedPeers" env:"TRUSTED_PEERS" env-default:"" env-description:"List of peers to connect to."`

TCPPort int `yaml:"TcpPort" env:"TCP_PORT" env-default:"13001" env-description:"TCP port for p2p transport"`
UDPPort int `yaml:"UdpPort" env:"UDP_PORT" env-default:"12001" env-description:"UDP port for discovery"`
Expand Down
78 changes: 66 additions & 12 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package p2pv1

import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

Expand All @@ -11,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
libp2pdiscbackoff "github.com/libp2p/go-libp2p/p2p/discovery/backoff"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"

"github.com/bloxapp/ssv/logging"
Expand Down Expand Up @@ -66,6 +69,7 @@ type p2pNetwork struct {
msgValidator validation.MessageValidator
connHandler connections.ConnHandler
connGater connmgr.ConnectionGater
trustedPeers []*peer.AddrInfo
metrics Metrics

state int32
Expand All @@ -83,12 +87,12 @@ type p2pNetwork struct {
}

// New creates a new p2p network
func New(logger *zap.Logger, cfg *Config, mr Metrics) network.P2PNetwork {
func New(logger *zap.Logger, cfg *Config, mr Metrics) (network.P2PNetwork, error) {
ctx, cancel := context.WithCancel(cfg.Ctx)

logger = logger.Named(logging.NameP2PNetwork)

return &p2pNetwork{
n := &p2pNetwork{
parentCtx: cfg.Ctx,
ctx: ctx,
cancel: cancel,
Expand All @@ -104,6 +108,30 @@ func New(logger *zap.Logger, cfg *Config, mr Metrics) network.P2PNetwork {
operatorDataStore: cfg.OperatorDataStore,
metrics: mr,
}
if err := n.parseTrustedPeers(); err != nil {
return nil, err
}
return n, nil
}

func (n *p2pNetwork) parseTrustedPeers() error {
// Group addresses by peer ID.
trustedPeers := map[peer.ID][]ma.Multiaddr{}
if len(n.cfg.TrustedPeers) > 0 {
for _, mas := range n.cfg.TrustedPeers {
for _, ma := range strings.Split(mas, ",") {
addrInfo, err := peer.AddrInfoFromString(ma)
if err != nil {
return fmt.Errorf("could not parse trusted peer: %w", err)
}
trustedPeers[addrInfo.ID] = append(trustedPeers[addrInfo.ID], addrInfo.Addrs...)
}
}
}
for id, addrs := range trustedPeers {
n.trustedPeers = append(n.trustedPeers, &peer.AddrInfo{ID: id, Addrs: addrs})
}
return nil
}

// Host implements HostProvider
Expand Down Expand Up @@ -164,9 +192,41 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
return nil
}

logger.Info("starting")
connector := make(chan peer.AddrInfo, connectorQueueSize)
go func() {
// Wait for own subnets to be subscribed to and updated.
// TODO: wait more intelligently with a channel.
time.Sleep(8 * time.Second)

go n.startDiscovery(logger)
ctx, cancel := context.WithCancel(n.ctx)
defer cancel()
n.backoffConnector.Connect(ctx, connector)
}()

// Connect to trusted peers first.
go func() {
for _, addrInfo := range n.trustedPeers {
connector <- *addrInfo
}
}()

ma, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{
ID: n.host.ID(),
Addrs: n.host.Addrs(),
})
if err != nil {
logger.Fatal("could not get my address", zap.Error(err))
}
maStrs := make([]string, len(ma))
for i, ima := range ma {
maStrs[i] = ima.String()
}
logger.Info("starting p2p",
zap.String("my_address", strings.Join(maStrs, ",")),
zap.Int("trusted_peers", len(n.trustedPeers)),
)

go n.startDiscovery(logger, connector)

async.Interval(n.ctx, connManagerGCInterval, n.peersBalancing(logger))
// don't report metrics in tests
Expand Down Expand Up @@ -206,20 +266,14 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() {
// startDiscovery starts the required services
// it will try to bootstrap discovery service, and inject a connect function.
// the connect function checks if we can connect to the given peer and if so passing it to the backoff connector.
func (n *p2pNetwork) startDiscovery(logger *zap.Logger) {
discoveredPeers := make(chan peer.AddrInfo, connectorQueueSize)
go func() {
ctx, cancel := context.WithCancel(n.ctx)
defer cancel()
n.backoffConnector.Connect(ctx, discoveredPeers)
}()
func (n *p2pNetwork) startDiscovery(logger *zap.Logger, connector chan peer.AddrInfo) {
err := tasks.Retry(func() error {
return n.disc.Bootstrap(logger, func(e discovery.PeerEvent) {
if !n.idx.CanConnect(e.AddrInfo.ID) {
return
}
select {
case discoveredPeers <- e.AddrInfo:
case connector <- e.AddrInfo:
default:
logger.Warn("connector queue is full, skipping new peer", fields.PeerID(e.AddrInfo.ID))
}
Expand Down
5 changes: 4 additions & 1 deletion network/p2p/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex int, keys t
cfg.OperatorDataStore = operatordatastore.New(&registrystorage.OperatorData{ID: spectypes.OperatorID(nodeIndex + 1)})

mr := metricsreporter.New()
p := New(logger, cfg, mr)
p, err := New(logger, cfg, mr)
if err != nil {
return nil, err
}
err = p.Setup(logger)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Node interface {
type Options struct {
// NetworkName is the network name of this node
NetworkName string `yaml:"Network" env:"NETWORK" env-default:"mainnet" env-description:"Network is the network of this node"`
CustomDomainType string `yaml:"CustomDomainType" env:"CUSTOM_DOMAIN_TYPE" env-default:"" env-description:"Override the SSV domain type. This is used to isolate the node from the rest of the network. Do not set unless you know what you are doing. Example: 0x01020304"`
Network networkconfig.NetworkConfig
BeaconNode beaconprotocol.BeaconNode // TODO: consider renaming to ConsensusClient
ExecutionClient *executionclient.ExecutionClient
Expand Down
1 change: 1 addition & 0 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (s *sharesStorage) load() error {
if err := val.Decode(obj.Value); err != nil {
return fmt.Errorf("failed to deserialize share: %w", err)
}
val.DomainType = types.GetDefaultDomain()
s.shares[hex.EncodeToString(val.ValidatorPubKey)] = val
return nil
})
Expand Down

0 comments on commit 2eea503

Please sign in to comment.