Skip to content

Commit

Permalink
persistent connections routine
Browse files Browse the repository at this point in the history
  • Loading branch information
AeonSw4n committed Jan 24, 2024
1 parent e40152a commit 63b1bcd
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 42 deletions.
19 changes: 14 additions & 5 deletions integration_testing/connection_controller_routines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,26 @@ func TestConnectionControllerNonValidatorConnector(t *testing.T) {
require := require.New(t)

// Spawn 6 non-validators node1, node2, node3, node4, node5, node6. Set node1's targetOutboundPeers to 3. Then make
// node1 create outbound connections to node2, node3, and node4, as well as 2 attempted persistent connections to
// non-existing ips.
// node1 create persistent outbound connections to node2, node3, and node4, as well as non-validator connections to
// node5 and node6.
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node1.Config.TargetOutboundPeers = 3
node1.Config.TargetOutboundPeers = 0
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")
node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3")
node4 := spawnNonValidatorNodeProtocol2(t, 18003, "node4")
node5 := spawnNonValidatorNodeProtocol2(t, 18004, "node5")
node6 := spawnNonValidatorNodeProtocol2(t, 18005, "node6")

node2 = startNode(t, node2)
defer node2.Stop()
node3 = startNode(t, node3)
defer node3.Stop()
node4 = startNode(t, node4)
defer node4.Stop()
node5 = startNode(t, node5)
defer node5.Stop()
node6 = startNode(t, node6)
defer node6.Stop()

node1.Config.ConnectIPs = []string{
node2.Listeners[0].Addr().String(),
Expand All @@ -280,10 +286,13 @@ func TestConnectionControllerNonValidatorConnector(t *testing.T) {
defer node1.Stop()

cc := node1.Server.GetConnectionController()
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection("127.0.0.1:18004"))
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection("127.0.0.1:18005"))
require.NoError(cc.CreateNonValidatorOutboundConnection(node5.Listeners[0].Addr().String()))
require.NoError(cc.CreateNonValidatorOutboundConnection(node6.Listeners[0].Addr().String()))

waitForCountRemoteNodeIndexer(t, node1, 3, 0, 3, 0)
waitForNonValidatorOutboundConnection(t, node1, node2)
waitForNonValidatorOutboundConnection(t, node1, node3)
waitForNonValidatorOutboundConnection(t, node1, node4)
}

func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions integration_testing/connection_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {

// Create a persistent connection from Node1 to Node2
cc := node1.Server.GetConnectionController()
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node2.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node2.Listeners[0].Addr().String())
require.NoError(err)
waitForValidatorConnection(t, node1, node2)
waitForNonValidatorInboundConnection(t, node2, node1)
node2.Stop()
Expand All @@ -408,7 +409,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {
node3 = startNode(t, node3)

// Create a persistent connection from Node1 to Node3
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node3.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node3.Listeners[0].Addr().String())
require.NoError(err)
waitForNonValidatorOutboundConnection(t, node1, node3)
waitForNonValidatorInboundConnection(t, node3, node1)
node3.Stop()
Expand All @@ -429,7 +431,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {

// Create a persistent connection from Node4 to Node5
cc = node4.Server.GetConnectionController()
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node5.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node5.Listeners[0].Addr().String())
require.NoError(err)
waitForNonValidatorOutboundConnection(t, node4, node5)
waitForValidatorConnection(t, node5, node4)
node5.Stop()
Expand All @@ -444,7 +447,8 @@ func TestConnectionControllerPersistentConnection(t *testing.T) {
defer node6.Stop()

// Create a persistent connection from Node4 to Node6
require.NoError(cc.CreateNonValidatorPersistentOutboundConnection(node6.Listeners[0].Addr().String()))
_, err = cc.CreateNonValidatorPersistentOutboundConnection(node6.Listeners[0].Addr().String())
require.NoError(err)
waitForValidatorConnection(t, node4, node6)
waitForValidatorConnection(t, node6, node4)
t.Logf("Test #4 passed | Successfuly created persistent connection from validator Node4 to validator Node6")
Expand Down
3 changes: 0 additions & 3 deletions integration_testing/connection_controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/deso-protocol/core/lib"
"os"
"testing"
"time"
)

func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) {
Expand Down Expand Up @@ -223,7 +222,5 @@ func spawnValidatorNodeProtocol2(t *testing.T, port uint32, id string, blsPriv *
node := cmd.NewNode(config)
node.Params.UserAgent = id
node.Params.ProtocolVersion = lib.ProtocolVersion2
node.Params.VersionNegotiationTimeout = 1 * time.Second
node.Params.VerackNegotiationTimeout = 1 * time.Second
return node
}
102 changes: 76 additions & 26 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ type ConnectionController struct {
// it's aware of at random and provides it to us.
AddrMgr *addrmgr.AddrManager

// When --connectips is set, we don't connect to anything from the addrmgr.
// When --connect-ips is set, we don't connect to anything from the addrmgr.
connectIps []string
// persistentIpToRemoteNodeIdsMap maps persistent IP addresses, like the --connect-ips, to the RemoteNodeIds of the
// corresponding RemoteNodes. This is used to ensure that we don't connect to the same persistent IP address twice.
// And that we can reconnect to the same persistent IP address if we disconnect from it.
persistentIpToRemoteNodeIdsMap map[string]RemoteNodeId

// The target number of non-validator outbound remote nodes we want to have. We will disconnect remote nodes once
// we've exceeded this number of outbound connections.
Expand Down Expand Up @@ -74,6 +78,7 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
rnManager: rnManager,
AddrMgr: addrMgr,
connectIps: connectIps,
persistentIpToRemoteNodeIdsMap: make(map[string]RemoteNodeId),
targetNonValidatorOutboundRemoteNodes: targetNonValidatorOutboundRemoteNodes,
targetNonValidatorInboundRemoteNodes: targetNonValidatorInboundRemoteNodes,
limitOneInboundRemoteNodePerIP: limitOneInboundConnectionPerIP,
Expand All @@ -82,15 +87,14 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
}

func (cc *ConnectionController) Start() {
cc.startGroup.Add(3)
cc.initiatePersistentConnections()
// Start the validator connector
cc.startGroup.Add(4)
go cc.startPersistentConnector()
go cc.startValidatorConnector()
go cc.startNonValidatorConnector()
go cc.startRemoteNodeCleanup()

cc.startGroup.Wait()
cc.exitGroup.Add(3)
cc.exitGroup.Add(4)
}

func (cc *ConnectionController) Stop() {
Expand All @@ -102,16 +106,15 @@ func (cc *ConnectionController) GetRemoteNodeManager() *RemoteNodeManager {
return cc.rnManager
}

func (cc *ConnectionController) initiatePersistentConnections() {
// Connect to addresses passed via the --connect-ips flag. These addresses are persistent in the sense that if we
// disconnect from one, we will try to reconnect to the same one.
if len(cc.connectIps) > 0 {
for _, connectIp := range cc.connectIps {
glog.Infof("ConnectionController.initiatePersistentConnections: Connecting to connectIp: %v", connectIp)
if err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp); err != nil {
glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+
"to connectIp %v: %v", connectIp, err)
}
func (cc *ConnectionController) startPersistentConnector() {
cc.startGroup.Done()
for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
cc.refreshConnectIps()
}
}
}
Expand Down Expand Up @@ -164,8 +167,8 @@ func (cc *ConnectionController) startRemoteNodeCleanup() {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
//cc.rnManager.Cleanup()
case <-time.After(30 * time.Second):
cc.rnManager.Cleanup()
}
}

Expand All @@ -181,6 +184,12 @@ func (cc *ConnectionController) _handleDonePeerMessage(origin *Peer, desoMsg DeS
}

cc.rnManager.DisconnectById(NewRemoteNodeId(origin.ID))
// Update the persistentIpToRemoteNodeIdsMap.
for ip, id := range cc.persistentIpToRemoteNodeIdsMap {
if id.ToUint64() == origin.ID {
delete(cc.persistentIpToRemoteNodeIdsMap, ip)
}
}
}

func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) {
Expand Down Expand Up @@ -256,6 +265,30 @@ func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Conne
cc.cmgr.RemoveAttemptedOutboundAddrs(oc.address)
}

// ###########################
// ## Persistent Connections
// ###########################

func (cc *ConnectionController) refreshConnectIps() {
// Connect to addresses passed via the --connect-ips flag. These addresses are persistent in the sense that if we
// disconnect from one, we will try to reconnect to the same one.
for _, connectIp := range cc.connectIps {
if _, ok := cc.persistentIpToRemoteNodeIdsMap[connectIp]; ok {
continue
}

glog.Infof("ConnectionController.initiatePersistentConnections: Connecting to connectIp: %v", connectIp)
id, err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp)
if err != nil {
glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+
"to connectIp %v: %v", connectIp, err)
continue
}

cc.persistentIpToRemoteNodeIdsMap[connectIp] = id
}
}

// ###########################
// ## Validator Connections
// ###########################
Expand Down Expand Up @@ -341,23 +374,40 @@ func (cc *ConnectionController) connectValidators(activeValidatorsMap *collectio
// refreshNonValidatorOutboundIndex is called periodically by the peer connector. It is responsible for disconnecting excess
// outbound remote nodes.
func (cc *ConnectionController) refreshNonValidatorOutboundIndex() {
// First let's check if we have an excess number of outbound remote nodes. If we do, we'll disconnect some of them.
// There are three categories of outbound remote nodes: attempted, connected, and persistent. All of these
// remote nodes are stored in the same non-validator outbound index. We want to disconnect excess remote nodes that
// are not persistent, starting with the attempted nodes first.

// First let's run a quick check to see if the number of our non-validator remote nodes exceeds our target. Note that
// this number will include the persistent nodes.
numOutboundRemoteNodes := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())
excessiveOutboundRemoteNodes := uint32(0)
if numOutboundRemoteNodes > cc.targetNonValidatorOutboundRemoteNodes {
excessiveOutboundRemoteNodes = numOutboundRemoteNodes - cc.targetNonValidatorOutboundRemoteNodes
if numOutboundRemoteNodes <= cc.targetNonValidatorOutboundRemoteNodes {
return
}
// We group the outbound remote nodes into two categories: attempted and connected. We disconnect the attempted
// remote nodes first, and then the connected remote nodes.

// If we get here, it means that we should potentially disconnect some remote nodes. Let's first separate the
// attempted and connected remote nodes, ignoring the persistent ones.
allOutboundRemoteNodes := cc.rnManager.GetNonValidatorOutboundIndex().GetAll()
var attemptedOutboundRemoteNodes, connectedOutboundRemoteNodes []*RemoteNode
for _, rn := range allOutboundRemoteNodes {
if rn.IsHandshakeCompleted() {
if rn.IsPersistent() {
// We do nothing for persistent remote nodes.
continue
} else if rn.IsHandshakeCompleted() {
connectedOutboundRemoteNodes = append(connectedOutboundRemoteNodes, rn)
} else {
attemptedOutboundRemoteNodes = append(attemptedOutboundRemoteNodes, rn)
}
}

// Having separated the attempted and connected remote nodes, we can now find the actual number of attempted and
// connected remote nodes. We can then find out how many remote nodes we need to disconnect.
numOutboundRemoteNodes = uint32(len(attemptedOutboundRemoteNodes) + len(connectedOutboundRemoteNodes))
excessiveOutboundRemoteNodes := uint32(0)
if numOutboundRemoteNodes > cc.targetNonValidatorOutboundRemoteNodes {
excessiveOutboundRemoteNodes = numOutboundRemoteNodes - cc.targetNonValidatorOutboundRemoteNodes
}

// First disconnect the attempted remote nodes.
for _, rn := range attemptedOutboundRemoteNodes {
if excessiveOutboundRemoteNodes == 0 {
Expand Down Expand Up @@ -465,10 +515,10 @@ func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKe
return cc.rnManager.CreateValidatorConnection(netAddr, publicKey)
}

func (cc *ConnectionController) CreateNonValidatorPersistentOutboundConnection(ipStr string) error {
func (cc *ConnectionController) CreateNonValidatorPersistentOutboundConnection(ipStr string) (RemoteNodeId, error) {
netAddr, err := cc.ConvertIPStringToNetAddress(ipStr)
if err != nil {
return err
return 0, err
}
return cc.rnManager.CreateNonValidatorPersistentOutboundConnection(netAddr)
}
Expand Down
8 changes: 4 additions & 4 deletions lib/remote_node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,19 @@ func (manager *RemoteNodeManager) CreateValidatorConnection(netAddr *wire.NetAdd
return nil
}

func (manager *RemoteNodeManager) CreateNonValidatorPersistentOutboundConnection(netAddr *wire.NetAddress) error {
func (manager *RemoteNodeManager) CreateNonValidatorPersistentOutboundConnection(netAddr *wire.NetAddress) (RemoteNodeId, error) {
if netAddr == nil {
return fmt.Errorf("RemoteNodeManager.CreateNonValidatorPersistentOutboundConnection: netAddr is nil")
return 0, fmt.Errorf("RemoteNodeManager.CreateNonValidatorPersistentOutboundConnection: netAddr is nil")
}

remoteNode := manager.newRemoteNode(nil)
if err := remoteNode.DialPersistentOutboundConnection(netAddr); err != nil {
return errors.Wrapf(err, "RemoteNodeManager.CreateNonValidatorPersistentOutboundConnection: Problem calling DialPersistentOutboundConnection "+
return 0, errors.Wrapf(err, "RemoteNodeManager.CreateNonValidatorPersistentOutboundConnection: Problem calling DialPersistentOutboundConnection "+
"for addr: (%s:%v)", netAddr.IP.String(), netAddr.Port)
}
manager.setRemoteNode(remoteNode)
manager.GetNonValidatorOutboundIndex().Set(remoteNode.GetId(), remoteNode)
return nil
return remoteNode.GetId(), nil
}

func (manager *RemoteNodeManager) CreateNonValidatorOutboundConnection(netAddr *wire.NetAddress) error {
Expand Down

0 comments on commit 63b1bcd

Please sign in to comment.