From 143dcfc378bcd6748827a893bee577a4598a565c Mon Sep 17 00:00:00 2001 From: Piotr Nojszewski <29924594+AeonSw4n@users.noreply.github.com> Date: Thu, 18 Jan 2024 11:13:56 -0800 Subject: [PATCH] Revert "Code split" This reverts commit 104c3ec20a69f996bd93ce59f6ae10a0b72c3918. Some cleanup and documenting Cleanup; document more nit ConnectIps test Better connectIps test; simplify ConnectionController tests refactor --- .../connection_controller_routines_test.go | 394 ++++++++++++++++++ .../connection_controller_utils_test.go | 55 ++- integration_testing/tools.go | 2 +- lib/connection_controller.go | 345 ++++++++++++++- lib/connection_manager.go | 26 +- lib/handshake_controller.go | 2 +- lib/network_connection.go | 8 +- lib/remote_node.go | 30 +- lib/remote_node_manager.go | 25 +- lib/server.go | 9 +- 10 files changed, 843 insertions(+), 53 deletions(-) create mode 100644 integration_testing/connection_controller_routines_test.go diff --git a/integration_testing/connection_controller_routines_test.go b/integration_testing/connection_controller_routines_test.go new file mode 100644 index 000000000..521df04a6 --- /dev/null +++ b/integration_testing/connection_controller_routines_test.go @@ -0,0 +1,394 @@ +package integration_testing + +import ( + "fmt" + "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/core/cmd" + "github.com/deso-protocol/core/collections" + "github.com/deso-protocol/core/lib" + "github.com/stretchr/testify/require" + "testing" +) + +func TestConnectionControllerInitiatePersistentConnections(t *testing.T) { + require := require.New(t) + t.Cleanup(func() { + setGetActiveValidatorImpl(lib.BasicGetActiveValidators) + }) + + // NonValidator Node1 will set its --connect-ips to two non-validators node2 and node3, + // and two validators node4 and node5. + node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1") + node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2") + node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3") + blsPriv4, err := bls.NewPrivateKey() + require.NoError(err) + node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4) + blsPriv5, err := bls.NewPrivateKey() + require.NoError(err) + node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5) + + node2 = startNode(t, node2) + node3 = startNode(t, node3) + node4 = startNode(t, node4) + node5 = startNode(t, node5) + + setGetActiveValidatorImplWithValidatorNodes(t, node4, node5) + + node1.Config.ConnectIPs = []string{ + node2.Listeners[0].Addr().String(), + node3.Listeners[0].Addr().String(), + node4.Listeners[0].Addr().String(), + node5.Listeners[0].Addr().String(), + } + node1 = startNode(t, node1) + waitForNonValidatorOutboundConnection(t, node1, node2) + waitForNonValidatorOutboundConnection(t, node1, node3) + waitForValidatorConnection(t, node1, node4) + waitForValidatorConnection(t, node1, node5) + waitForValidatorConnection(t, node4, node5) + waitForCountRemoteNodeIndexer(t, node1, 4, 2, 2, 0) + waitForCountRemoteNodeIndexer(t, node2, 1, 0, 0, 1) + waitForCountRemoteNodeIndexer(t, node3, 1, 0, 0, 1) + waitForCountRemoteNodeIndexer(t, node4, 2, 1, 0, 1) + waitForCountRemoteNodeIndexer(t, node5, 2, 1, 0, 1) + node1.Stop() + t.Logf("Test #1 passed | Successfully run non-validator node1 with --connect-ips set to node2, node3, node4, node5") + + // Now try again with a validator node6, with connect-ips set to node2, node3, node4, node5. + blsPriv6, err := bls.NewPrivateKey() + require.NoError(err) + node6 := spawnValidatorNodeProtocol2(t, 18005, "node6", blsPriv6) + node6.Config.ConnectIPs = []string{ + node2.Listeners[0].Addr().String(), + node3.Listeners[0].Addr().String(), + node4.Listeners[0].Addr().String(), + node5.Listeners[0].Addr().String(), + } + node6 = startNode(t, node6) + setGetActiveValidatorImplWithValidatorNodes(t, node4, node5, node6) + waitForNonValidatorOutboundConnection(t, node6, node2) + waitForNonValidatorOutboundConnection(t, node6, node3) + waitForValidatorConnection(t, node6, node4) + waitForValidatorConnection(t, node6, node5) + waitForValidatorConnection(t, node4, node5) + waitForCountRemoteNodeIndexer(t, node6, 4, 2, 2, 0) + waitForCountRemoteNodeIndexer(t, node2, 1, 1, 0, 0) + waitForCountRemoteNodeIndexer(t, node3, 1, 1, 0, 0) + waitForCountRemoteNodeIndexer(t, node4, 2, 2, 0, 0) + waitForCountRemoteNodeIndexer(t, node5, 2, 2, 0, 0) + node2.Stop() + node3.Stop() + node4.Stop() + node5.Stop() + node6.Stop() + t.Logf("Test #2 passed | Successfully run validator node6 with --connect-ips set to node2, node3, node4, node5") +} + +func TestConnectionControllerValidatorConnector(t *testing.T) { + require := require.New(t) + t.Cleanup(func() { + setGetActiveValidatorImpl(lib.BasicGetActiveValidators) + }) + + // Spawn 5 validators node1, node2, node3, node4, node5 and two non-validators node6 and node7. + // All the validators are initially in the validator set. And later, node1 and node2 will be removed from the + // validator set. Then, make node3 inactive, and node2 active again. Then, make all the validators inactive. + // Make node6, and node7 connect-ips to all the validators. + + blsPriv1, err := bls.NewPrivateKey() + require.NoError(err) + node1 := spawnValidatorNodeProtocol2(t, 18000, "node1", blsPriv1) + blsPriv2, err := bls.NewPrivateKey() + require.NoError(err) + node2 := spawnValidatorNodeProtocol2(t, 18001, "node2", blsPriv2) + blsPriv3, err := bls.NewPrivateKey() + require.NoError(err) + node3 := spawnValidatorNodeProtocol2(t, 18002, "node3", blsPriv3) + blsPriv4, err := bls.NewPrivateKey() + require.NoError(err) + node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4) + blsPriv5, err := bls.NewPrivateKey() + require.NoError(err) + node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5) + + node6 := spawnNonValidatorNodeProtocol2(t, 18005, "node6") + node7 := spawnNonValidatorNodeProtocol2(t, 18006, "node7") + + node1 = startNode(t, node1) + defer node1.Stop() + node2 = startNode(t, node2) + defer node2.Stop() + node3 = startNode(t, node3) + defer node3.Stop() + node4 = startNode(t, node4) + defer node4.Stop() + node5 = startNode(t, node5) + defer node5.Stop() + setGetActiveValidatorImplWithValidatorNodes(t, node1, node2, node3, node4, node5) + + node6.Config.ConnectIPs = []string{ + node1.Listeners[0].Addr().String(), + node2.Listeners[0].Addr().String(), + node3.Listeners[0].Addr().String(), + node4.Listeners[0].Addr().String(), + node5.Listeners[0].Addr().String(), + } + node7.Config.ConnectIPs = node6.Config.ConnectIPs + node6 = startNode(t, node6) + defer node6.Stop() + node7 = startNode(t, node7) + defer node7.Stop() + + // Verify full graph between active validators. + waitForValidatorFullGraph(t, node1, node2, node3, node4, node5) + // Verify connections of non-validators. + for _, nonValidator := range []*cmd.Node{node6, node7} { + waitForValidatorConnectionOneWay(t, nonValidator, node1, node2, node3, node4, node5) + } + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // Verify connection counts of active validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 4, 0, 2) + } + // NOOP Verify connection counts of inactive validators. + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 5, 0, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 5, 0, 0) + t.Logf("Test #1 passed | Successfully run validators node1, node2, node3, node4, node5; non-validators node6, node7") + + // Remove node1 and node2 from the validator set. + setGetActiveValidatorImplWithValidatorNodes(t, node3, node4, node5) + // Verify full graph between active validators. + waitForValidatorFullGraph(t, node3, node4, node5) + // Verify connections of non-validators. + for _, nonValidator := range []*cmd.Node{node1, node2, node6, node7} { + waitForValidatorConnectionOneWay(t, nonValidator, node3, node4, node5) + } + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // Verify connections of active validators. + for _, validator := range []*cmd.Node{node3, node4, node5} { + waitForNonValidatorInboundXOROutboundConnection(t, validator, node1) + waitForNonValidatorInboundXOROutboundConnection(t, validator, node2) + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 2, 0, 2) + } + // Verify connection counts of inactive validators. + for _, validator := range []*cmd.Node{node1, node2} { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 3, 0, 2) + } + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 3, 2, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 3, 2, 0) + t.Logf("Test #2 passed | Successfully run validators node3, node4, node5; inactive-validators node1, node2; " + + "non-validators node6, node7") + + // Remove node3 from the validator set. Make node1 active again. + setGetActiveValidatorImplWithValidatorNodes(t, node1, node4, node5) + // Verify full graph between active validators. + waitForValidatorFullGraph(t, node1, node4, node5) + // Verify connections of non-validators. + for _, nonValidator := range []*cmd.Node{node2, node3, node6, node7} { + waitForValidatorConnectionOneWay(t, nonValidator, node1, node4, node5) + } + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // Verify connections of active validators. + for _, validator := range []*cmd.Node{node1, node4, node5} { + waitForNonValidatorInboundXOROutboundConnection(t, validator, node2) + waitForNonValidatorInboundXOROutboundConnection(t, validator, node3) + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 2, 0, 2) + } + // Verify connection counts of inactive validators. + for _, validator := range []*cmd.Node{node2, node3} { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 3, 0, 2) + } + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 3, 2, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 3, 2, 0) + t.Logf("Test #3 passed | Successfully run validators node1, node4, node5; inactive validators node2, node3; " + + "non-validators node6, node7") + + // Make all validators inactive. + setGetActiveValidatorImplWithValidatorNodes(t) + // NOOP Verify full graph between active validators. + // NOOP Verify connections of non-validators. + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // NOOP Verify connections of active validators. + // Verify connections and counts of inactive validators. + inactiveValidators := []*cmd.Node{node1, node2, node3, node4, node5} + for ii := 0; ii < len(inactiveValidators); ii++ { + for jj := ii + 1; jj < len(inactiveValidators); jj++ { + waitForNonValidatorInboundXOROutboundConnection(t, inactiveValidators[ii], inactiveValidators[jj]) + } + } + inactiveValidatorsRev := []*cmd.Node{node5, node4, node3, node2, node1} + for ii := 0; ii < len(inactiveValidatorsRev); ii++ { + for jj := ii + 1; jj < len(inactiveValidatorsRev); jj++ { + waitForNonValidatorInboundXOROutboundConnection(t, inactiveValidatorsRev[ii], inactiveValidatorsRev[jj]) + } + } + for _, validator := range inactiveValidators { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 0, 0, 2) + } + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 0, 5, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 0, 5, 0) + t.Logf("Test #4 passed | Successfully run inactive validators node1, node2, node3, node4, node5; " + + "non-validators node6, node7") +} + +func TestConnectionControllerNonValidatorConnector(t *testing.T) { + require := require.New(t) + + // Spawn 6 non-validators node1, node2, node3, node4, node5, node6. Set node1's targetOutboundPeers to 3. Then make + // node1 create outbound connections to node2, node3, and node4, as well as 2 attempted persistent connections to + // non-existing ips. + node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1") + node1.Config.TargetOutboundPeers = 3 + node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2") + node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3") + node4 := spawnNonValidatorNodeProtocol2(t, 18003, "node4") + + node2 = startNode(t, node2) + defer node2.Stop() + node3 = startNode(t, node3) + defer node3.Stop() + node4 = startNode(t, node4) + defer node4.Stop() + + node1.Config.ConnectIPs = []string{ + node2.Listeners[0].Addr().String(), + node3.Listeners[0].Addr().String(), + node4.Listeners[0].Addr().String(), + } + node1 = startNode(t, node1) + defer node1.Stop() + + cc := node1.Server.GetConnectionController() + require.NoError(cc.CreateNonValidatorPersistentOutboundConnection("127.0.0.1:18004")) + require.NoError(cc.CreateNonValidatorPersistentOutboundConnection("127.0.0.1:18005")) + + waitForCountRemoteNodeIndexer(t, node1, 3, 0, 3, 0) +} + +func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) { + node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1") + node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2") + + node1.Config.ConnectIPs = []string{"127.0.0.1:18001"} + node2.Config.ConnectIPs = []string{"127.0.0.1:18000"} + + node1 = startNode(t, node1) + node2 = startNode(t, node2) + defer node1.Stop() + defer node2.Stop() + + waitForCountRemoteNodeIndexer(t, node1, 2, 0, 1, 1) + waitForCountRemoteNodeIndexer(t, node2, 2, 0, 1, 1) +} + +func setGetActiveValidatorImplWithValidatorNodes(t *testing.T, validators ...*cmd.Node) { + require := require.New(t) + + mapping := collections.NewConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry]() + for _, validator := range validators { + seed := validator.Config.PosValidatorSeed + if seed == "" { + t.Fatalf("Validator node %s does not have a PosValidatorSeed set", validator.Params.UserAgent) + } + keystore, err := lib.NewBLSKeystore(seed) + require.NoError(err) + mapping.Set(keystore.GetSigner().GetPublicKey().Serialize(), createSimpleValidatorEntry(validator)) + } + setGetActiveValidatorImpl(func() *collections.ConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry] { + return mapping + }) +} + +func setGetActiveValidatorImpl(mapping func() *collections.ConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry]) { + lib.GetActiveValidatorImpl = mapping +} + +func createSimpleValidatorEntry(node *cmd.Node) *lib.ValidatorEntry { + return &lib.ValidatorEntry{ + Domains: [][]byte{[]byte(node.Listeners[0].Addr().String())}, + } +} + +func waitForValidatorFullGraph(t *testing.T, validators ...*cmd.Node) { + for ii := 0; ii < len(validators); ii++ { + waitForValidatorConnectionOneWay(t, validators[ii], validators[ii+1:]...) + } +} + +func waitForValidatorConnectionOneWay(t *testing.T, n *cmd.Node, validators ...*cmd.Node) { + if len(validators) == 0 { + return + } + for _, validator := range validators { + waitForValidatorConnection(t, n, validator) + } +} + +func waitForNonValidatorInboundXOROutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { + userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + conditionInbound := conditionNonValidatorInboundConnectionDynamic(t, node1, node2, true) + conditionOutbound := conditionNonValidatorOutboundConnectionDynamic(t, node1, node2, true) + xorCondition := func() bool { + return conditionInbound() != conditionOutbound() + } + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound XOR outbound non-validator Node (%s)", + userAgentN1, userAgentN2), xorCondition) +} + +func waitForMinNonValidatorCountRemoteNodeIndexer(t *testing.T, node *cmd.Node, allCount int, validatorCount int, + minNonValidatorOutboundCount int, minNonValidatorInboundCount int) { + + userAgent := node.Params.UserAgent + rnManager := node.Server.GetConnectionController().GetRemoteNodeManager() + condition := func() bool { + return checkRemoteNodeIndexerMinNonValidatorCount(rnManager, allCount, validatorCount, + minNonValidatorOutboundCount, minNonValidatorInboundCount) + } + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have at least %d non-validator outbound nodes and %d non-validator inbound nodes", + userAgent, minNonValidatorOutboundCount, minNonValidatorInboundCount), condition) +} + +func checkRemoteNodeIndexerMinNonValidatorCount(manager *lib.RemoteNodeManager, allCount int, validatorCount int, + minNonValidatorOutboundCount int, minNonValidatorInboundCount int) bool { + + if allCount != manager.GetAllRemoteNodes().Count() { + return false + } + if validatorCount != manager.GetValidatorIndex().Count() { + return false + } + if minNonValidatorOutboundCount > manager.GetNonValidatorOutboundIndex().Count() { + return false + } + if minNonValidatorInboundCount > manager.GetNonValidatorInboundIndex().Count() { + return false + } + if allCount != manager.GetValidatorIndex().Count()+ + manager.GetNonValidatorOutboundIndex().Count()+ + manager.GetNonValidatorInboundIndex().Count() { + return false + } + return true +} diff --git a/integration_testing/connection_controller_utils_test.go b/integration_testing/connection_controller_utils_test.go index 4d5594634..f4a46df75 100644 --- a/integration_testing/connection_controller_utils_test.go +++ b/integration_testing/connection_controller_utils_test.go @@ -7,6 +7,7 @@ import ( "github.com/deso-protocol/core/lib" "os" "testing" + "time" ) func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { @@ -26,14 +27,24 @@ func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) } return true } - waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) } func waitForNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + condition := conditionNonValidatorOutboundConnection(t, node1, node2) + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), condition) +} + +func conditionNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) func() bool { + return conditionNonValidatorOutboundConnectionDynamic(t, node1, node2, false) +} + +func conditionNonValidatorOutboundConnectionDynamic(t *testing.T, node1 *cmd.Node, node2 *cmd.Node, inactiveValidator bool) func() bool { userAgentN2 := node2.Params.UserAgent rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() - n1ValidatedN2 := func() bool { + return func() bool { if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, false, true, false) { return false } @@ -44,19 +55,29 @@ func waitForNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 if !rnFromN2.IsHandshakeCompleted() { return false } - if rnFromN2.GetValidatorPublicKey() != nil { - return false + // inactiveValidator should have the public key. + if inactiveValidator { + return rnFromN2.GetValidatorPublicKey() != nil } - return true + return rnFromN2.GetValidatorPublicKey() == nil } - waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) } func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + condition := conditionNonValidatorInboundConnection(t, node1, node2) + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), condition) +} + +func conditionNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) func() bool { + return conditionNonValidatorInboundConnectionDynamic(t, node1, node2, false) +} + +func conditionNonValidatorInboundConnectionDynamic(t *testing.T, node1 *cmd.Node, node2 *cmd.Node, inactiveValidator bool) func() bool { userAgentN2 := node2.Params.UserAgent rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() - n1ValidatedN2 := func() bool { + return func() bool { if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, false, false, true) { return false } @@ -67,12 +88,12 @@ func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 * if !rnFromN2.IsHandshakeCompleted() { return false } - if rnFromN2.GetValidatorPublicKey() != nil { - return false + // inactiveValidator should have the public key. + if inactiveValidator { + return rnFromN2.GetValidatorPublicKey() != nil } - return true + return rnFromN2.GetValidatorPublicKey() == nil } - waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) } func waitForEmptyRemoteNodeIndexer(t *testing.T, node1 *cmd.Node) { @@ -90,15 +111,15 @@ func waitForEmptyRemoteNodeIndexer(t *testing.T, node1 *cmd.Node) { func waitForCountRemoteNodeIndexer(t *testing.T, node1 *cmd.Node, allCount int, validatorCount int, nonValidatorOutboundCount int, nonValidatorInboundCount int) { - userAgentN1 := node1.Params.UserAgent - rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() - n1ValidatedN2 := func() bool { - if true != checkRemoteNodeIndexerCount(rnManagerN1, allCount, validatorCount, nonValidatorOutboundCount, nonValidatorInboundCount) { + userAgent := node1.Params.UserAgent + rnManager := node1.Server.GetConnectionController().GetRemoteNodeManager() + condition := func() bool { + if true != checkRemoteNodeIndexerCount(rnManager, allCount, validatorCount, nonValidatorOutboundCount, nonValidatorInboundCount) { return false } return true } - waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have appropriate RemoteNodes counts", userAgentN1), n1ValidatedN2) + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have appropriate RemoteNodes counts", userAgent), condition) } func checkRemoteNodeIndexerUserAgent(manager *lib.RemoteNodeManager, userAgent string, validator bool, @@ -202,5 +223,7 @@ func spawnValidatorNodeProtocol2(t *testing.T, port uint32, id string, blsPriv * node := cmd.NewNode(config) node.Params.UserAgent = id node.Params.ProtocolVersion = lib.ProtocolVersion2 + node.Params.VersionNegotiationTimeout = 1 * time.Second + node.Params.VerackNegotiationTimeout = 1 * time.Second return node } diff --git a/integration_testing/tools.go b/integration_testing/tools.go index 2f97e942d..4db913136 100644 --- a/integration_testing/tools.go +++ b/integration_testing/tools.go @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32) config.MaxSyncBlockHeight = 0 config.ConnectIPs = []string{} config.PrivateMode = true - config.GlogV = 0 + config.GlogV = 2 config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0" config.MaxInboundPeers = maxPeers config.TargetOutboundPeers = maxPeers diff --git a/lib/connection_controller.go b/lib/connection_controller.go index fef9fa887..5bba356cb 100644 --- a/lib/connection_controller.go +++ b/lib/connection_controller.go @@ -5,12 +5,23 @@ import ( "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/wire" "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/core/collections" "github.com/golang/glog" "github.com/pkg/errors" "net" "strconv" + "sync" + "time" ) +type GetActiveValidatorsFunc func() *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry] + +var GetActiveValidatorImpl GetActiveValidatorsFunc = BasicGetActiveValidators + +func BasicGetActiveValidators() *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry] { + return collections.NewConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]() +} + // ConnectionController is a structure that oversees all connections to remote nodes. It is responsible for kicking off // the initial connections a node makes to the network. It is also responsible for creating RemoteNodes from all // successful outbound and inbound connections. The ConnectionController also ensures that the node is connected to @@ -44,11 +55,16 @@ type ConnectionController struct { // When true, only one connection per IP is allowed. Prevents eclipse attacks // among other things. limitOneInboundRemoteNodePerIP bool + + startGroup sync.WaitGroup + exitChan chan struct{} + exitGroup sync.WaitGroup } func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handshakeController *HandshakeController, - rnManager *RemoteNodeManager, blsKeystore *BLSKeystore, addrMgr *addrmgr.AddrManager, targetNonValidatorOutboundRemoteNodes uint32, - targetNonValidatorInboundRemoteNodes uint32, limitOneInboundConnectionPerIP bool) *ConnectionController { + rnManager *RemoteNodeManager, blsKeystore *BLSKeystore, addrMgr *addrmgr.AddrManager, connectIps []string, + targetNonValidatorOutboundRemoteNodes uint32, targetNonValidatorInboundRemoteNodes uint32, + limitOneInboundConnectionPerIP bool) *ConnectionController { return &ConnectionController{ params: params, @@ -57,16 +73,104 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh handshake: handshakeController, rnManager: rnManager, AddrMgr: addrMgr, + connectIps: connectIps, targetNonValidatorOutboundRemoteNodes: targetNonValidatorOutboundRemoteNodes, targetNonValidatorInboundRemoteNodes: targetNonValidatorInboundRemoteNodes, limitOneInboundRemoteNodePerIP: limitOneInboundConnectionPerIP, + exitChan: make(chan struct{}), } } +func (cc *ConnectionController) Start() { + cc.startGroup.Add(3) + cc.initiatePersistentConnections() + // Start the validator connector + go cc.startValidatorConnector() + go cc.startNonValidatorConnector() + go cc.startRemoteNodeCleanup() + + cc.startGroup.Wait() + cc.exitGroup.Add(3) +} + +func (cc *ConnectionController) Stop() { + close(cc.exitChan) + cc.exitGroup.Wait() +} + func (cc *ConnectionController) GetRemoteNodeManager() *RemoteNodeManager { return cc.rnManager } +func (cc *ConnectionController) initiatePersistentConnections() { + // Connect to addresses passed via the --connect-ips flag. These addresses are persistent in the sense that if we + // disconnect from one, we will try to reconnect to the same one. + if len(cc.connectIps) > 0 { + for _, connectIp := range cc.connectIps { + glog.Infof("ConnectionController.initiatePersistentConnections: Connecting to connectIp: %v", connectIp) + if err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp); err != nil { + glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+ + "to connectIp %v: %v", connectIp, err) + } + } + } +} + +// startValidatorConnector is responsible for ensuring that the node is connected to all active validators. It does +// this in two steps. First, it looks through the already established connections and checks if any of these connections +// are validators. If they are, it adds them to the validator index. It also checks if any of the existing validators +// are no longer active and removes them from the validator index. Second, it checks if any of the active validators +// are missing from the validator index. If they are, it attempts to connect to them. +func (cc *ConnectionController) startValidatorConnector() { + cc.startGroup.Done() + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + activeValidatorsMap := GetActiveValidatorImpl() + cc.refreshValidatorIndex(activeValidatorsMap) + cc.connectValidators(activeValidatorsMap) + } + } +} + +// startNonValidatorConnector is responsible for ensuring that the node is connected to the target number of outbound +// and inbound remote nodes. To do this, it periodically checks the number of outbound and inbound remote nodes, and +// if the number is above the target number, it disconnects the excess remote nodes. If the number is below the target +// number, it attempts to connect to new remote nodes. +func (cc *ConnectionController) startNonValidatorConnector() { + cc.startGroup.Done() + + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + cc.refreshNonValidatorOutboundIndex() + cc.refreshNonValidatorInboundIndex() + cc.connectNonValidators() + } + } +} + +func (cc *ConnectionController) startRemoteNodeCleanup() { + cc.startGroup.Done() + + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + //cc.rnManager.Cleanup() + } + } + +} + // ########################### // ## Handlers (Peer, DeSoMessage) // ########################### @@ -114,7 +218,7 @@ func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMs remoteNode, err = cc.processInboundConnection(msg.Connection) if err != nil { glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling inbound connection: %v", err) - msg.Connection.Close() + cc.cleanupFailedInboundConnection(remoteNode, msg.Connection) return } case ConnectionTypeOutbound: @@ -130,6 +234,13 @@ func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMs cc.handshake.InitiateHandshake(remoteNode) } +func (cc *ConnectionController) cleanupFailedInboundConnection(remoteNode *RemoteNode, connection Connection) { + if remoteNode != nil { + cc.rnManager.Disconnect(remoteNode) + } + connection.Close() +} + func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Connection) { oc, ok := connection.(*outboundConnection) if !ok { @@ -141,13 +252,211 @@ func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Conne if rn != nil { cc.rnManager.Disconnect(rn) } + oc.Close() cc.cmgr.RemoveAttemptedOutboundAddrs(oc.address) } // ########################### -// ## Connections +// ## Validator Connections +// ########################### + +// refreshValidatorIndex re-indexes validators based on the activeValidatorsMap. It is called periodically by the +// validator connector. +func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) { + // De-index inactive validators. We skip any checks regarding RemoteNodes connection status, nor do we verify whether + // de-indexing the validator would result in an excess number of outbound/inbound connections. Any excess connections + // will be cleaned up by the peer connector. + validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy() + for pk, rn := range validatorRemoteNodeMap { + // If the validator is no longer active, de-index it. + if _, ok := activeValidatorsMap.Get(pk); !ok { + cc.rnManager.SetNonValidator(rn) + cc.rnManager.UnsetValidator(rn) + } + } + + // Look for validators in our existing outbound / inbound connections. + allNonValidators := cc.rnManager.GetAllNonValidators() + for _, rn := range allNonValidators { + // It is possible for a RemoteNode to be in the non-validator indices, and still have a public key. This can happen + // if the RemoteNode advertised support for the SFValidator service flag during handshake, and provided us + // with a public key, and a corresponding proof of possession signature. + pk := rn.GetValidatorPublicKey() + if pk == nil { + continue + } + // It is possible that through unlikely concurrence, and malevolence, two non-validators happen to have the same + // public key, which goes undetected during handshake. To prevent this from affecting the indexing of the validator + // set, we check that the non-validator's public key is not already present in the validator index. + if _, ok := cc.rnManager.GetValidatorIndex().Get(pk.Serialize()); ok { + cc.rnManager.Disconnect(rn) + continue + } + + // If the RemoteNode turns out to be in the validator set, index it. + if _, ok := activeValidatorsMap.Get(pk.Serialize()); ok { + cc.rnManager.SetValidator(rn) + cc.rnManager.UnsetNonValidator(rn) + } + } +} + +// connectValidators attempts to connect to all active validators that are not already connected. It is called +// periodically by the validator connector. +func (cc *ConnectionController) connectValidators(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) { + // Look through the active validators and connect to any that we're not already connected to. + if cc.blsKeystore == nil { + return + } + + validators := activeValidatorsMap.Copy() + for pk, validator := range validators { + _, exists := cc.rnManager.GetValidatorIndex().Get(pk) + // If we're already connected to the validator, continue. + if exists { + continue + } + if cc.blsKeystore.GetSigner().GetPublicKey().Serialize() == pk { + continue + } + + publicKey, err := pk.Deserialize() + if err != nil { + continue + } + + // For now, we only dial the first domain in the validator's domain list. + address := string(validator.Domains[0]) + if err := cc.CreateValidatorConnection(address, publicKey); err != nil { + // TODO: Do we want to log an error here? + continue + } + } +} + +// ########################### +// ## NonValidator Connections // ########################### +// refreshNonValidatorOutboundIndex is called periodically by the peer connector. It is responsible for disconnecting excess +// outbound remote nodes. +func (cc *ConnectionController) refreshNonValidatorOutboundIndex() { + // First let's check if we have an excess number of outbound remote nodes. If we do, we'll disconnect some of them. + numOutboundRemoteNodes := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + excessiveOutboundRemoteNodes := uint32(0) + if numOutboundRemoteNodes > cc.targetNonValidatorOutboundRemoteNodes { + excessiveOutboundRemoteNodes = numOutboundRemoteNodes - cc.targetNonValidatorOutboundRemoteNodes + } + // We group the outbound remote nodes into two categories: attempted and connected. We disconnect the attempted + // remote nodes first, and then the connected remote nodes. + allOutboundRemoteNodes := cc.rnManager.GetNonValidatorOutboundIndex().GetAll() + var attemptedOutboundRemoteNodes, connectedOutboundRemoteNodes []*RemoteNode + for _, rn := range allOutboundRemoteNodes { + if rn.IsHandshakeCompleted() { + connectedOutboundRemoteNodes = append(connectedOutboundRemoteNodes, rn) + } else { + attemptedOutboundRemoteNodes = append(attemptedOutboundRemoteNodes, rn) + } + } + // First disconnect the attempted remote nodes. + for _, rn := range attemptedOutboundRemoteNodes { + if excessiveOutboundRemoteNodes == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveOutboundRemoteNodes-- + } + // Now disconnect the connected remote nodes, if we still have too many remote nodes. + for _, rn := range connectedOutboundRemoteNodes { + if excessiveOutboundRemoteNodes == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveOutboundRemoteNodes-- + } +} + +// refreshNonValidatorInboundIndex is called periodically by the non-validator connector. It is responsible for +// disconnecting excess inbound remote nodes. +func (cc *ConnectionController) refreshNonValidatorInboundIndex() { + // First let's check if we have an excess number of inbound remote nodes. If we do, we'll disconnect some of them. + numConnectedInboundRemoteNodes := uint32(cc.rnManager.GetNonValidatorInboundIndex().Count()) + excessiveInboundRemoteNodes := uint32(0) + if numConnectedInboundRemoteNodes > cc.targetNonValidatorInboundRemoteNodes { + excessiveInboundRemoteNodes = numConnectedInboundRemoteNodes - cc.targetNonValidatorInboundRemoteNodes + } + // Disconnect random inbound non-validators if we have too many of them. + inboundRemoteNodes := cc.rnManager.GetNonValidatorInboundIndex().GetAll() + for _, rn := range inboundRemoteNodes { + if excessiveInboundRemoteNodes == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveInboundRemoteNodes-- + } +} + +func (cc *ConnectionController) connectNonValidators() { + // Only connect to addresses from the addrmgr if we don't specify --connect-ips. These addresses are *not* persistent, + // meaning if we disconnect from one we'll try a different one. + // TODO: We used this condition in the old code to prevent the node from connecting to non-connect-ips nodes. + // I'm not sure whether this is still necessary. I suppose the concern here was that the connect-ips nodes + // should be prioritized over the addrmgr nodes, especially during syncing. However, I think we can achieve the + // same result by defining another flag, like a boolean --sync-from-persistent-peers-only, which could indicate that + // we disregard non-persistent non-connect-ips nodes during syncing, if the flag is set to true. + // FIXME: This about uncommenting the below condition. + //if len(cc.connectIps) == 0 { + // return + //} + + numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + + remainingOutboundPeers := uint32(0) + if numOutboundPeers < cc.targetNonValidatorOutboundRemoteNodes { + remainingOutboundPeers = cc.targetNonValidatorOutboundRemoteNodes - numOutboundPeers + } + for ii := uint32(0); ii < remainingOutboundPeers; ii++ { + addr := cc.getRandomUnconnectedAddress() + if addr == nil { + break + } + cc.AddrMgr.Attempt(addr) + // FIXME: error handle + cc.rnManager.CreateNonValidatorOutboundConnection(addr) + } +} + +func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress { + for tries := 0; tries < 100; tries++ { + addr := cc.AddrMgr.GetAddress() + if addr == nil { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: addr from GetAddressWithExclusions was nil") + break + } + + if cc.cmgr.IsConnectedOutboundIpAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundancy %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + if cc.cmgr.IsAttemptedOutboundIpAddress(addr.NetAddress()) { + continue + } + + // We can only have one outbound address per /16. This is similar to + // Bitcoin and we do it to prevent Sybil attacks. + if cc.cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + return addr.NetAddress() + } + + //glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") + return nil +} + func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKey *bls.PublicKey) error { netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) if err != nil { @@ -235,8 +544,8 @@ func (cc *ConnectionController) processOutboundConnection(conn Connection) (*Rem } if oc.failed { - return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Failed to connect to peer (%s)", - oc.address.IP.String()) + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Failed to connect to peer (%s:%v)", + oc.address.IP.String(), oc.address.Port) } if !oc.isPersistent { @@ -263,11 +572,35 @@ func (cc *ConnectionController) processOutboundConnection(conn Connection) (*Rem "for addr: (%s)", oc.connection.RemoteAddr().String()) } + // Attach the connection before additional validation steps because it is already established. remoteNode, err := cc.rnManager.AttachOutboundConnection(oc.connection, na, oc.attemptId, oc.isPersistent) if remoteNode == nil || err != nil { return nil, errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling rnManager.AttachOutboundConnection "+ "for addr: (%s)", oc.connection.RemoteAddr().String()) } + + // If this is a persistent remote node or a validator, we don't need to do any extra connection validation. + if remoteNode.IsPersistent() || remoteNode.GetValidatorPublicKey() != nil { + return remoteNode, nil + } + + // If we get here, it means we're dealing with a non-persistent or non-validator remote node. We perform additional + // connection validation. + + // If we already have enough outbound peers, then don't bother adding this one. + if cc.enoughNonValidatorOutboundConnections() { + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Connected to maximum number of outbound "+ + "peers (%d)", cc.targetNonValidatorOutboundRemoteNodes) + } + + // If the group key overlaps with another peer we're already connected to then abort mission. We only connect to + // one peer per IP group in order to prevent Sybil attacks. + if cc.cmgr.IsFromRedundantOutboundIPAddress(oc.address) { + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Rejecting OUTBOUND NON-PERSISTENT "+ + "connection with redundant group key (%s).", addrmgr.GroupKey(oc.address)) + } + cc.cmgr.AddToGroupKey(na) + return remoteNode, nil } diff --git a/lib/connection_manager.go b/lib/connection_manager.go index 7c6f510ac..1ba4bf8f1 100644 --- a/lib/connection_manager.go +++ b/lib/connection_manager.go @@ -168,6 +168,10 @@ func NewConnectionManager( // Check if the address passed shares a group with any addresses already in our data structures. func (cmgr *ConnectionManager) IsFromRedundantOutboundIPAddress(na *wire.NetAddress) bool { groupKey := addrmgr.GroupKey(na) + // For the sake of running multiple nodes on the same machine, we allow localhost connections. + if groupKey == "local" { + return false + } cmgr.mtxOutboundConnIPGroups.Lock() numGroupsForKey := cmgr.outboundConnIPGroups[groupKey] @@ -185,7 +189,7 @@ func (cmgr *ConnectionManager) IsFromRedundantOutboundIPAddress(na *wire.NetAddr return true } -func (cmgr *ConnectionManager) addToGroupKey(na *wire.NetAddress) { +func (cmgr *ConnectionManager) AddToGroupKey(na *wire.NetAddress) { groupKey := addrmgr.GroupKey(na) cmgr.mtxOutboundConnIPGroups.Lock() @@ -429,7 +433,6 @@ func (cmgr *ConnectionManager) addPeer(pp *Peer) { // number of outbound peers. Also add the peer's address to // our map. if _, ok := peerList[pp.ID]; !ok { - cmgr.addToGroupKey(pp.netAddr) atomic.AddUint32(&cmgr.numOutboundPeers, 1) cmgr.mtxAddrsMaps.Lock() @@ -528,16 +531,6 @@ func (cmgr *ConnectionManager) _logOutboundPeerData() { numInboundPeers := int(atomic.LoadUint32(&cmgr.numInboundPeers)) numPersistentPeers := int(atomic.LoadUint32(&cmgr.numPersistentPeers)) glog.V(1).Infof("Num peers: OUTBOUND(%d) INBOUND(%d) PERSISTENT(%d)", numOutboundPeers, numInboundPeers, numPersistentPeers) - - cmgr.mtxOutboundConnIPGroups.Lock() - for _, vv := range cmgr.outboundConnIPGroups { - if vv != 0 && vv != 1 { - glog.V(1).Infof("_logOutboundPeerData: Peer group count != (0 or 1). "+ - "Is (%d) instead. This "+ - "should never happen.", vv) - } - } - cmgr.mtxOutboundConnIPGroups.Unlock() } func (cmgr *ConnectionManager) AddTimeSample(addrStr string, timeSample time.Time) { @@ -617,8 +610,13 @@ func (cmgr *ConnectionManager) Start() { select { case oc := <-cmgr.outboundConnectionChan: - glog.V(2).Infof("ConnectionManager.Start: Successfully established an outbound connection with "+ - "(addr= %v)", oc.connection.RemoteAddr()) + if oc.failed { + glog.V(2).Infof("ConnectionManager.Start: Failed to establish an outbound connection with "+ + "(id= %v)", oc.attemptId) + } else { + glog.V(2).Infof("ConnectionManager.Start: Successfully established an outbound connection with "+ + "(addr= %v)", oc.connection.RemoteAddr()) + } delete(cmgr.outboundConnectionAttempts, oc.attemptId) cmgr.serverMessageQueue <- &ServerMessage{ Peer: nil, diff --git a/lib/handshake_controller.go b/lib/handshake_controller.go index bde07745a..f355bad93 100644 --- a/lib/handshake_controller.go +++ b/lib/handshake_controller.go @@ -122,7 +122,7 @@ func (hc *HandshakeController) _handleVersionMessage(origin *Peer, desoMsg DeSoM if hc.usedNonces.Contains(msgNonce) { hc.usedNonces.Delete(msgNonce) glog.Errorf("HandshakeController._handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+ - "nonce collision", origin.ID) + "nonce collision, nonce (%v)", origin.ID, msgNonce) hc.rnManager.Disconnect(rn) return } diff --git a/lib/network_connection.go b/lib/network_connection.go index eb6d4ab55..ffb0bb1f1 100644 --- a/lib/network_connection.go +++ b/lib/network_connection.go @@ -33,7 +33,9 @@ func (oc *outboundConnection) Close() { if oc.terminated { return } - oc.connection.Close() + if oc.connection != nil { + oc.connection.Close() + } oc.terminated = true } @@ -58,7 +60,9 @@ func (ic *inboundConnection) Close() { return } - ic.connection.Close() + if ic.connection != nil { + ic.connection.Close() + } ic.terminated = true } diff --git a/lib/remote_node.go b/lib/remote_node.go index f2d849a36..4ca6f8c12 100644 --- a/lib/remote_node.go +++ b/lib/remote_node.go @@ -219,10 +219,22 @@ func (rn *RemoteNode) IsConnected() bool { return rn.connectionStatus == RemoteNodeStatus_Connected } +func (rn *RemoteNode) IsVersionSent() bool { + return rn.connectionStatus == RemoteNodeStatus_VersionSent +} + +func (rn *RemoteNode) IsVerackSent() bool { + return rn.connectionStatus == RemoteNodeStatus_VerackSent +} + func (rn *RemoteNode) IsHandshakeCompleted() bool { return rn.connectionStatus == RemoteNodeStatus_HandshakeCompleted } +func (rn *RemoteNode) IsTerminated() bool { + return rn.connectionStatus == RemoteNodeStatus_Terminated +} + func (rn *RemoteNode) IsValidator() bool { if !rn.IsHandshakeCompleted() { return false @@ -340,9 +352,9 @@ func (rn *RemoteNode) InitiateHandshake(nonce uint64) error { return fmt.Errorf("InitiateHandshake: Remote node is not connected") } + versionTimeExpected := time.Now().Add(rn.params.VersionNegotiationTimeout) + rn.versionTimeExpected = &versionTimeExpected if rn.GetPeer().IsOutbound() { - versionTimeExpected := time.Now().Add(rn.params.VersionNegotiationTimeout) - rn.versionTimeExpected = &versionTimeExpected if err := rn.sendVersionMessage(nonce); err != nil { return fmt.Errorf("InitiateHandshake: Problem sending version message to peer (id= %d): %v", rn.id, err) } @@ -393,6 +405,16 @@ func (rn *RemoteNode) newVersionMessage(nonce uint64) *MsgDeSoVersion { return ver } +func (rn *RemoteNode) IsTimedOut() bool { + if rn.IsConnected() || rn.IsVersionSent() { + return rn.versionTimeExpected.Before(time.Now()) + } + if rn.IsVerackSent() { + return rn.verackTimeExpected.Before(time.Now()) + } + return false +} + // HandleVersionMessage is called upon receiving a version message from the RemoteNode's peer. The peer may be the one // initiating the handshake, in which case, we should respond with our own version message. To do this, we pass the // responseNonce to this function, which we will use in our response version message. @@ -400,7 +422,7 @@ func (rn *RemoteNode) HandleVersionMessage(verMsg *MsgDeSoVersion, responseNonce rn.mtx.Lock() defer rn.mtx.Unlock() - if rn.connectionStatus != RemoteNodeStatus_Connected && rn.connectionStatus != RemoteNodeStatus_VersionSent { + if !rn.IsConnected() && !rn.IsVersionSent() { return fmt.Errorf("HandleVersionMessage: RemoteNode is not connected or version exchange has already "+ "been completed, connectionStatus: %v", rn.connectionStatus) } @@ -412,7 +434,7 @@ func (rn *RemoteNode) HandleVersionMessage(verMsg *MsgDeSoVersion, responseNonce } // Verify that the peer's version message is sent within the version negotiation timeout. - if rn.versionTimeExpected != nil && rn.versionTimeExpected.Before(time.Now()) { + if rn.versionTimeExpected.Before(time.Now()) { return fmt.Errorf("RemoteNode.HandleVersionMessage: Requesting disconnect for id: (%v) "+ "version timeout. Time expected: %v, now: %v", rn.id, rn.versionTimeExpected.UnixMicro(), time.Now().UnixMicro()) } diff --git a/lib/remote_node_manager.go b/lib/remote_node_manager.go index fb269d072..bff73a985 100644 --- a/lib/remote_node_manager.go +++ b/lib/remote_node_manager.go @@ -126,6 +126,17 @@ func (manager *RemoteNodeManager) SendMessage(rn *RemoteNode, desoMessage DeSoMe return rn.SendMessage(desoMessage) } +func (manager *RemoteNodeManager) Cleanup() { + manager.mtx.Lock() + defer manager.mtx.Unlock() + + for _, rn := range manager.GetAllRemoteNodes().GetAll() { + if rn.IsTimedOut() { + manager.Disconnect(rn) + } + } +} + // ########################### // ## Create RemoteNode // ########################### @@ -140,7 +151,7 @@ func (manager *RemoteNodeManager) CreateValidatorConnection(netAddr *wire.NetAdd } remoteNode := manager.newRemoteNode(publicKey) - if err := remoteNode.DialPersistentOutboundConnection(netAddr); err != nil { + if err := remoteNode.DialOutboundConnection(netAddr); err != nil { return errors.Wrapf(err, "RemoteNodeManager.CreateValidatorConnection: Problem calling DialPersistentOutboundConnection "+ "for addr: (%s:%v)", netAddr.IP.String(), netAddr.Port) } @@ -184,7 +195,7 @@ func (manager *RemoteNodeManager) AttachInboundConnection(conn net.Conn, remoteNode := manager.newRemoteNode(nil) if err := remoteNode.AttachInboundConnection(conn, na); err != nil { - return nil, errors.Wrapf(err, "RemoteNodeManager.AttachInboundConnection: Problem calling AttachInboundConnection "+ + return remoteNode, errors.Wrapf(err, "RemoteNodeManager.AttachInboundConnection: Problem calling AttachInboundConnection "+ "for addr: (%s)", conn.RemoteAddr().String()) } @@ -219,7 +230,7 @@ func (manager *RemoteNodeManager) setRemoteNode(rn *RemoteNode) { manager.mtx.Lock() defer manager.mtx.Unlock() - if rn == nil { + if rn == nil || rn.IsTerminated() { return } @@ -230,7 +241,7 @@ func (manager *RemoteNodeManager) SetNonValidator(rn *RemoteNode) { manager.mtx.Lock() defer manager.mtx.Unlock() - if rn == nil { + if rn == nil || rn.IsTerminated() { return } @@ -245,7 +256,7 @@ func (manager *RemoteNodeManager) SetValidator(remoteNode *RemoteNode) { manager.mtx.Lock() defer manager.mtx.Unlock() - if remoteNode == nil { + if remoteNode == nil || remoteNode.IsTerminated() { return } @@ -260,7 +271,7 @@ func (manager *RemoteNodeManager) UnsetValidator(remoteNode *RemoteNode) { manager.mtx.Lock() defer manager.mtx.Unlock() - if remoteNode == nil { + if remoteNode == nil || remoteNode.IsTerminated() { return } @@ -275,7 +286,7 @@ func (manager *RemoteNodeManager) UnsetNonValidator(rn *RemoteNode) { manager.mtx.Lock() defer manager.mtx.Unlock() - if rn == nil { + if rn == nil || rn.IsTerminated() { return } diff --git a/lib/server.go b/lib/server.go index d4c371955..a4fa28376 100644 --- a/lib/server.go +++ b/lib/server.go @@ -499,8 +499,8 @@ func NewServer( rnManager := NewRemoteNodeManager(srv, _chain, _cmgr, _blsKeystore, _params, _minFeeRateNanosPerKB, nodeServices) srv.handshakeController = NewHandshakeController(rnManager) - srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, - _blsKeystore, _desoAddrMgr, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) + srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, _blsKeystore, + _desoAddrMgr, _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) @@ -2547,6 +2547,9 @@ func (srv *Server) Stop() { srv.cmgr.Stop() glog.Infof(CLog(Yellow, "Server.Stop: Closed the ConnectionManger")) + srv.connectionController.Stop() + glog.Infof(CLog(Yellow, "Server.Stop: Closed the ConnectionController")) + // Stop the miner if we have one running. if srv.miner != nil { srv.miner.Stop() @@ -2629,6 +2632,8 @@ func (srv *Server) Start() { if srv.miner != nil && len(srv.miner.PublicKeys) > 0 { go srv.miner.Start() } + + srv.connectionController.Start() } // SyncPrefixProgress keeps track of sync progress on an individual prefix. It is used in