Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoS NetworkManager Persistent Connector #942

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions integration_testing/connection_controller_routines_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package integration_testing

import (
"fmt"
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/core/cmd"
"github.com/deso-protocol/core/collections"
"github.com/deso-protocol/core/lib"
"github.com/stretchr/testify/require"
"testing"
)

func TestConnectionControllerInitiatePersistentConnections(t *testing.T) {
require := require.New(t)
t.Cleanup(func() {
setGetActiveValidatorImpl(lib.BasicGetActiveValidators)
})

// NonValidator Node1 will set its --connect-ips to two non-validators node2 and node3,
// and two validators node4 and node5.
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")
node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3")
blsPriv4, err := bls.NewPrivateKey()
require.NoError(err)
node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4)
blsPriv5, err := bls.NewPrivateKey()
require.NoError(err)
node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5)

node2 = startNode(t, node2)
node3 = startNode(t, node3)
node4 = startNode(t, node4)
node5 = startNode(t, node5)

setGetActiveValidatorImplWithValidatorNodes(t, node4, node5)

node1.Config.ConnectIPs = []string{
node2.Listeners[0].Addr().String(),
node3.Listeners[0].Addr().String(),
node4.Listeners[0].Addr().String(),
node5.Listeners[0].Addr().String(),
}
node1 = startNode(t, node1)
waitForNonValidatorOutboundConnection(t, node1, node2)
waitForNonValidatorOutboundConnection(t, node1, node3)
waitForValidatorConnection(t, node1, node4)
waitForValidatorConnection(t, node1, node5)
waitForValidatorConnection(t, node4, node5)
waitForCountRemoteNodeIndexer(t, node1, 4, 2, 2, 0)
waitForCountRemoteNodeIndexer(t, node2, 1, 0, 0, 1)
waitForCountRemoteNodeIndexer(t, node3, 1, 0, 0, 1)
waitForCountRemoteNodeIndexer(t, node4, 2, 1, 0, 1)
waitForCountRemoteNodeIndexer(t, node5, 2, 1, 0, 1)
node1.Stop()
t.Logf("Test #1 passed | Successfully run non-validator node1 with --connect-ips set to node2, node3, node4, node5")

// Now try again with a validator node6, with connect-ips set to node2, node3, node4, node5.
blsPriv6, err := bls.NewPrivateKey()
require.NoError(err)
node6 := spawnValidatorNodeProtocol2(t, 18005, "node6", blsPriv6)
node6.Config.ConnectIPs = []string{
node2.Listeners[0].Addr().String(),
node3.Listeners[0].Addr().String(),
node4.Listeners[0].Addr().String(),
node5.Listeners[0].Addr().String(),
}
node6 = startNode(t, node6)
setGetActiveValidatorImplWithValidatorNodes(t, node4, node5, node6)
waitForNonValidatorOutboundConnection(t, node6, node2)
waitForNonValidatorOutboundConnection(t, node6, node3)
waitForValidatorConnection(t, node6, node4)
waitForValidatorConnection(t, node6, node5)
waitForValidatorConnection(t, node4, node5)
waitForCountRemoteNodeIndexer(t, node6, 4, 2, 2, 0)
waitForCountRemoteNodeIndexer(t, node2, 1, 1, 0, 0)
waitForCountRemoteNodeIndexer(t, node3, 1, 1, 0, 0)
waitForCountRemoteNodeIndexer(t, node4, 2, 2, 0, 0)
waitForCountRemoteNodeIndexer(t, node5, 2, 2, 0, 0)
node2.Stop()
node3.Stop()
node4.Stop()
node5.Stop()
node6.Stop()
t.Logf("Test #2 passed | Successfully run validator node6 with --connect-ips set to node2, node3, node4, node5")
}

func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) {
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")

node1.Config.ConnectIPs = []string{"127.0.0.1:18001"}
node2.Config.ConnectIPs = []string{"127.0.0.1:18000"}

node1 = startNode(t, node1)
node2 = startNode(t, node2)
defer node1.Stop()
defer node2.Stop()

waitForCountRemoteNodeIndexer(t, node1, 2, 0, 1, 1)
waitForCountRemoteNodeIndexer(t, node2, 2, 0, 1, 1)
}

func setGetActiveValidatorImplWithValidatorNodes(t *testing.T, validators ...*cmd.Node) {
require := require.New(t)

mapping := collections.NewConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry]()
for _, validator := range validators {
seed := validator.Config.PosValidatorSeed
if seed == "" {
t.Fatalf("Validator node %s does not have a PosValidatorSeed set", validator.Params.UserAgent)
}
keystore, err := lib.NewBLSKeystore(seed)
require.NoError(err)
mapping.Set(keystore.GetSigner().GetPublicKey().Serialize(), createSimpleValidatorEntry(validator))
}
setGetActiveValidatorImpl(func() *collections.ConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry] {
return mapping
})
}

func setGetActiveValidatorImpl(mapping func() *collections.ConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry]) {
lib.GetActiveValidatorImpl = mapping
}

func createSimpleValidatorEntry(node *cmd.Node) *lib.ValidatorEntry {
return &lib.ValidatorEntry{
Domains: [][]byte{[]byte(node.Listeners[0].Addr().String())},
}
}

func waitForValidatorFullGraph(t *testing.T, validators ...*cmd.Node) {
for ii := 0; ii < len(validators); ii++ {
waitForValidatorConnectionOneWay(t, validators[ii], validators[ii+1:]...)
}
}

func waitForValidatorConnectionOneWay(t *testing.T, n *cmd.Node, validators ...*cmd.Node) {
if len(validators) == 0 {
return
}
for _, validator := range validators {
waitForValidatorConnection(t, n, validator)
}
}

func waitForNonValidatorInboundXOROutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) {
userAgentN1 := node1.Params.UserAgent
userAgentN2 := node2.Params.UserAgent
conditionInbound := conditionNonValidatorInboundConnectionDynamic(t, node1, node2, true)
conditionOutbound := conditionNonValidatorOutboundConnectionDynamic(t, node1, node2, true)
xorCondition := func() bool {
return conditionInbound() != conditionOutbound()
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound XOR outbound non-validator Node (%s)",
userAgentN1, userAgentN2), xorCondition)
}

func waitForMinNonValidatorCountRemoteNodeIndexer(t *testing.T, node *cmd.Node, allCount int, validatorCount int,
minNonValidatorOutboundCount int, minNonValidatorInboundCount int) {

userAgent := node.Params.UserAgent
rnManager := node.Server.GetConnectionController().GetRemoteNodeManager()
condition := func() bool {
return checkRemoteNodeIndexerMinNonValidatorCount(rnManager, allCount, validatorCount,
minNonValidatorOutboundCount, minNonValidatorInboundCount)
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have at least %d non-validator outbound nodes and %d non-validator inbound nodes",
userAgent, minNonValidatorOutboundCount, minNonValidatorInboundCount), condition)
}

func checkRemoteNodeIndexerMinNonValidatorCount(manager *lib.RemoteNodeManager, allCount int, validatorCount int,
minNonValidatorOutboundCount int, minNonValidatorInboundCount int) bool {

if allCount != manager.GetAllRemoteNodes().Count() {
return false
}
if validatorCount != manager.GetValidatorIndex().Count() {
return false
}
if minNonValidatorOutboundCount > manager.GetNonValidatorOutboundIndex().Count() {
return false
}
if minNonValidatorInboundCount > manager.GetNonValidatorInboundIndex().Count() {
return false
}
if allCount != manager.GetValidatorIndex().Count()+
manager.GetNonValidatorOutboundIndex().Count()+
manager.GetNonValidatorInboundIndex().Count() {
return false
}
return true
}
12 changes: 8 additions & 4 deletions integration_testing/connection_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {

// Create a persistent connection from Node1 to Node2
cc := node1.Server.GetConnectionController()
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node2.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node2.Listeners[0].Addr().String())
require.NoError(err)
waitForValidatorConnection(t, node1, node2)
waitForNonValidatorInboundConnection(t, node2, node1)
node2.Stop()
Expand All @@ -408,7 +409,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {
node3 = startNode(t, node3)

// Create a persistent connection from Node1 to Node3
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node3.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node3.Listeners[0].Addr().String())
require.NoError(err)
waitForNonValidatorOutboundConnection(t, node1, node3)
waitForNonValidatorInboundConnection(t, node3, node1)
node3.Stop()
Expand All @@ -429,7 +431,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {

// Create a persistent connection from Node4 to Node5
cc = node4.Server.GetConnectionController()
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node5.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node5.Listeners[0].Addr().String())
require.NoError(err)
waitForNonValidatorOutboundConnection(t, node4, node5)
waitForValidatorConnection(t, node5, node4)
node5.Stop()
Expand All @@ -444,7 +447,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {
defer node6.Stop()

// Create a persistent connection from Node4 to Node6
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node6.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node6.Listeners[0].Addr().String())
require.NoError(err)
waitForValidatorConnection(t, node4, node6)
waitForValidatorConnection(t, node6, node4)
t.Logf("Test #4 passed | Successfuly created persistent connection from validator Node4 to validator Node6")
Expand Down
52 changes: 36 additions & 16 deletions integration_testing/connection_controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,24 @@ func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node)
}
return true
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2)
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2)
}

func waitForNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) {
userAgentN1 := node1.Params.UserAgent
userAgentN2 := node2.Params.UserAgent
condition := conditionNonValidatorOutboundConnection(t, node1, node2)
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), condition)
}

func conditionNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) func() bool {
return conditionNonValidatorOutboundConnectionDynamic(t, node1, node2, false)
}

func conditionNonValidatorOutboundConnectionDynamic(t *testing.T, node1 *cmd.Node, node2 *cmd.Node, inactiveValidator bool) func() bool {
userAgentN2 := node2.Params.UserAgent
rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager()
n1ValidatedN2 := func() bool {
return func() bool {
if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, false, true, false) {
return false
}
Expand All @@ -44,19 +54,29 @@ func waitForNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2
if !rnFromN2.IsHandshakeCompleted() {
return false
}
if rnFromN2.GetValidatorPublicKey() != nil {
return false
// inactiveValidator should have the public key.
if inactiveValidator {
return rnFromN2.GetValidatorPublicKey() != nil
}
return true
return rnFromN2.GetValidatorPublicKey() == nil
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2)
}

func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) {
userAgentN1 := node1.Params.UserAgent
userAgentN2 := node2.Params.UserAgent
condition := conditionNonValidatorInboundConnection(t, node1, node2)
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), condition)
}

func conditionNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) func() bool {
return conditionNonValidatorInboundConnectionDynamic(t, node1, node2, false)
}

func conditionNonValidatorInboundConnectionDynamic(t *testing.T, node1 *cmd.Node, node2 *cmd.Node, inactiveValidator bool) func() bool {
userAgentN2 := node2.Params.UserAgent
rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager()
n1ValidatedN2 := func() bool {
return func() bool {
if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, false, false, true) {
return false
}
Expand All @@ -67,12 +87,12 @@ func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *
if !rnFromN2.IsHandshakeCompleted() {
return false
}
if rnFromN2.GetValidatorPublicKey() != nil {
return false
// inactiveValidator should have the public key.
if inactiveValidator {
return rnFromN2.GetValidatorPublicKey() != nil
}
return true
return rnFromN2.GetValidatorPublicKey() == nil
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2)
}

func waitForEmptyRemoteNodeIndexer(t *testing.T, node1 *cmd.Node) {
Expand All @@ -90,15 +110,15 @@ func waitForEmptyRemoteNodeIndexer(t *testing.T, node1 *cmd.Node) {
func waitForCountRemoteNodeIndexer(t *testing.T, node1 *cmd.Node, allCount int, validatorCount int,
nonValidatorOutboundCount int, nonValidatorInboundCount int) {

userAgentN1 := node1.Params.UserAgent
rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager()
n1ValidatedN2 := func() bool {
if true != checkRemoteNodeIndexerCount(rnManagerN1, allCount, validatorCount, nonValidatorOutboundCount, nonValidatorInboundCount) {
userAgent := node1.Params.UserAgent
rnManager := node1.Server.GetConnectionController().GetRemoteNodeManager()
condition := func() bool {
if true != checkRemoteNodeIndexerCount(rnManager, allCount, validatorCount, nonValidatorOutboundCount, nonValidatorInboundCount) {
return false
}
return true
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have appropriate RemoteNodes counts", userAgentN1), n1ValidatedN2)
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have appropriate RemoteNodes counts", userAgent), condition)
}

func checkRemoteNodeIndexerUserAgent(manager *lib.RemoteNodeManager, userAgent string, validator bool,
Expand Down
2 changes: 1 addition & 1 deletion integration_testing/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32)
config.MaxSyncBlockHeight = 0
config.ConnectIPs = []string{}
config.PrivateMode = true
config.GlogV = 0
config.GlogV = 2
config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0"
config.MaxInboundPeers = maxPeers
config.TargetOutboundPeers = maxPeers
Expand Down
Loading
Loading