Skip to content

Commit

Permalink
Merge branch 'main' into chore/add-shutdown-cw-wormhole-testing
Browse files Browse the repository at this point in the history
  • Loading branch information
kakucodes authored Mar 5, 2025
2 parents 96edcd3 + d36a16c commit ddec10c
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 20 deletions.
8 changes: 8 additions & 0 deletions node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -52,6 +53,7 @@ func runP2P(
monitorPeers bool,
loggingMap *LoggingMap,
gossipAdvertiseAddress string,
protectedPeers []string,
) (*P2PSub, error) {
// p2p setup
components := p2p.DefaultComponents()
Expand All @@ -63,6 +65,12 @@ func runP2P(
return nil, err
}

if len(protectedPeers) != 0 {
for _, peerId := range protectedPeers {
components.ConnMgr.Protect(peer.ID(peerId), "configured")
}
}

topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")

Expand Down
4 changes: 3 additions & 1 deletion node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
protectedPeers []string
listenAddr *string
nodeKeyPath *string
signerKeyPath *string
Expand All @@ -57,6 +58,7 @@ func init() {
p2pNetworkID = QueryServerCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port")
p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
QueryServerCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")
nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries")
listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
Expand Down Expand Up @@ -204,7 +206,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {

// Run p2p
pendingResponses := NewPendingResponses(logger)
p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress)
p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress, protectedPeers)
if err != nil {
logger.Fatal("Failed to start p2p", zap.Error(err))
}
Expand Down
12 changes: 8 additions & 4 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ import (
)

var (
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
protectedPeers []string

nodeKeyPath *string

Expand Down Expand Up @@ -262,6 +263,7 @@ var (
ccqAllowedRequesters *string
ccqP2pPort *uint
ccqP2pBootstrap *string
ccqProtectedPeers []string
ccqAllowedPeers *string
ccqBackfillCache *bool

Expand All @@ -282,6 +284,7 @@ func init() {
p2pNetworkID = NodeCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
p2pPort = NodeCmd.Flags().Uint("port", p2p.DefaultPort, "P2P UDP listener port")
p2pBootstrap = NodeCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)")
NodeCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")

statusAddr = NodeCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")

Expand Down Expand Up @@ -491,6 +494,7 @@ func init() {
ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries")
ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port")
ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)")
NodeCmd.Flags().StringSliceVarP(&ccqProtectedPeers, "ccqProtectedPeers", "", []string{}, "")
ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)")
ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", true, "Should EVM chains backfill CCQ timestamp cache on startup")
gossipAdvertiseAddress = NodeCmd.Flags().String("gossipAdvertiseAddress", "", "External IP to advertize on Guardian and CCQ p2p (use if behind a NAT or running in k8s)")
Expand Down Expand Up @@ -1777,7 +1781,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn),
node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters),
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures, protectedPeers, ccqProtectedPeers),
node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(*p2pNetworkID),
}
Expand Down
9 changes: 6 additions & 3 deletions node/cmd/spy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var (
var (
envStr *string

p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
protectedPeers []string

statusAddr *string

Expand All @@ -59,6 +60,7 @@ func init() {
p2pNetworkID = SpyCmd.Flags().String("network", "", "P2P network identifier (optional for testnet or mainnet, overrides default, required for devnet)")
p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
SpyCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")

statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")

Expand Down Expand Up @@ -396,6 +398,7 @@ func runSpy(cmd *cobra.Command, args []string) {
rootCtxCancel,
p2p.WithSignedVAAListener(signedInC),
p2p.WithComponents(components),
p2p.WithProtectedPeers(protectedPeers),
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
GuardianOptionNoAccountant(), // disable accountant
GuardianOptionGovernor(true, false, ""),
GuardianOptionGatewayRelayer("", nil), // disable gateway relayer
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }),
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }, []string{}, []string{}),
GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail),
GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail),
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
Expand Down
7 changes: 6 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func GuardianOptionP2P(
ccqAllowedPeers string,
gossipAdvertiseAddress string,
ibcFeaturesFunc func() string,
protectedPeers []string,
ccqProtectedPeers []string,
) *GuardianOption {
return &GuardianOption{
name: "p2p",
Expand Down Expand Up @@ -101,7 +103,10 @@ func GuardianOptionP2P(
g.queryResponsePublicationC.readC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers),
ccqAllowedPeers,
protectedPeers,
ccqProtectedPeers,
),
p2p.WithProcessorFeaturesFunc(processor.GetFeatures),
)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions node/pkg/p2p/ccq_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (ccq *ccqP2p) run(
port uint,
signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
queryResponseReadC <-chan *query.QueryResponsePublication,
protectedPeers []string,
errC chan error,
) error {
networkID := p2pNetworkID + "/ccq"
Expand All @@ -95,6 +96,12 @@ func (ccq *ccqP2p) run(
return fmt.Errorf("failed to create p2p: %w", err)
}

if len(protectedPeers) != 0 {
for _, peerId := range protectedPeers {
components.ConnMgr.Protect(peer.ID(peerId), "configured")
}
}

// Build a map of bootstrap peers so we can always allow subscribe requests from them.
bootstrapPeersMap := map[string]struct{}{}
bootstrappers, _ := BootstrapAddrs(ccq.logger, bootstrapPeers, ccq.h.ID())
Expand Down
9 changes: 8 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if len(params.protectedPeers) != 0 {
for _, peerId := range params.protectedPeers {
logger.Info("protecting peer", zap.String("peerId", peerId))
params.components.ConnMgr.Protect(peer.ID(peerId), "configured")
}
}

nodeIdBytes, err := h.ID().Marshal()
if err != nil {
panic(err)
Expand Down Expand Up @@ -462,7 +469,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
if params.ccqEnabled {
ccqErrC := make(chan error)
ccq := newCcqRunP2p(logger, params.ccqAllowedPeers, params.components)
if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, ccqErrC); err != nil {
if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, params.ccqProtectedPeers, ccqErrC); err != nil {
return fmt.Errorf("failed to start p2p for CCQ: %w", err)
}
defer ccq.close()
Expand Down
22 changes: 22 additions & 0 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type (
ccqBootstrapPeers string
ccqPort uint
ccqAllowedPeers string
protectedPeers []string
ccqProtectedPeers []string
}

// RunOpt is used to specify optional parameters.
Expand Down Expand Up @@ -162,6 +164,22 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
}
}

// WithProtectedPeers is used to set the protected peers.
func WithProtectedPeers(protectedPeers []string) RunOpt {
return func(p *RunParams) error {
p.protectedPeers = protectedPeers
return nil
}
}

// WithCcqProtectedPeers is used to set the protected peers for CCQ.
func WithCcqProtectedPeers(ccqProtectedPeers []string) RunOpt {
return func(p *RunParams) error {
p.ccqProtectedPeers = ccqProtectedPeers
return nil
}
}

// WithGuardianOptions is used to set options that are only meaningful to the guardian.
func WithGuardianOptions(
nodeName string,
Expand All @@ -185,6 +203,8 @@ func WithGuardianOptions(
ccqBootstrapPeers string,
ccqPort uint,
ccqAllowedPeers string,
protectedPeers []string,
ccqProtectedPeers []string,
) RunOpt {
return func(p *RunParams) error {
p.nodeName = nodeName
Expand All @@ -208,6 +228,8 @@ func WithGuardianOptions(
p.ccqBootstrapPeers = ccqBootstrapPeers
p.ccqPort = ccqPort
p.ccqAllowedPeers = ccqAllowedPeers
p.protectedPeers = protectedPeers
p.ccqProtectedPeers = ccqProtectedPeers
return nil
}
}
Expand Down
67 changes: 66 additions & 1 deletion node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,57 @@ func TestRunParamsWithDisableHeartbeatVerify(t *testing.T) {
assert.True(t, params.disableHeartbeatVerify)
}

func TestRunParamsWithProtectedPeers(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()

protectedPeers := []string{"peer1", "peer2", "peer3"}
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
WithProtectedPeers(protectedPeers),
)

require.NoError(t, err)
require.NotNil(t, params)

require.Equal(t, len(protectedPeers), len(params.protectedPeers))
assert.Equal(t, protectedPeers[0], params.protectedPeers[0])
assert.Equal(t, protectedPeers[1], params.protectedPeers[1])
assert.Equal(t, protectedPeers[2], params.protectedPeers[2])
}

func TestRunParamsWithCcqProtectedPeers(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()

ccqProtectedPeers := []string{"peerA", "peerB"}
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
WithCcqProtectedPeers(ccqProtectedPeers),
)

require.NoError(t, err)
require.NotNil(t, params)

require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers))
assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0])
assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1])
}

func TestRunParamsWithGuardianOptions(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
Expand Down Expand Up @@ -159,6 +210,8 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
ccqBootstrapPeers := "some bootstrap string"
ccqPort := uint(4242)
ccqAllowedPeers := "some allowed peers"
protectedPeers := []string{"peer1", "peer2", "peer3"}
ccqProtectedPeers := []string{"peerA", "peerB"}

params, err := NewRunParams(
bootstrapPeers,
Expand Down Expand Up @@ -187,7 +240,10 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
queryResponseReadC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers),
ccqAllowedPeers,
protectedPeers,
ccqProtectedPeers,
),
)

require.NoError(t, err)
Expand All @@ -210,4 +266,13 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
assert.Equal(t, ccqBootstrapPeers, params.ccqBootstrapPeers)
assert.Equal(t, ccqPort, params.ccqPort)
assert.Equal(t, ccqAllowedPeers, params.ccqAllowedPeers)

require.Equal(t, len(protectedPeers), len(params.protectedPeers))
assert.Equal(t, protectedPeers[0], params.protectedPeers[0])
assert.Equal(t, protectedPeers[1], params.protectedPeers[1])
assert.Equal(t, protectedPeers[2], params.protectedPeers[2])

require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers))
assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0])
assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1])
}
18 changes: 10 additions & 8 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.gov,
g.disableHeartbeatVerify,
g.components,
nil, //g.ibcFeaturesFunc,
false, // gateway relayer enabled
false, // ccqEnabled
nil, // signed query request channel
nil, // query response channel
"", // query bootstrap peers
0, // query port
"", // query allowed peers),
nil, //g.ibcFeaturesFunc,
false, // gateway relayer enabled
false, // ccqEnabled
nil, // signed query request channel
nil, // query response channel
"", // query bootstrap peers
0, // query port
"", // query allowed peers),
[]string{}, // protected peers
[]string{}, // ccq protected peers
))
require.NoError(t, err)

Expand Down

0 comments on commit ddec10c

Please sign in to comment.