Skip to content

Commit

Permalink
Validators as non-persistent connections; Cleanup routine; IPGroupLim…
Browse files Browse the repository at this point in the history
…it fixes
  • Loading branch information
AeonSw4n committed Jan 22, 2024
1 parent 437e535 commit f1fe09a
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 60 deletions.
42 changes: 27 additions & 15 deletions integration_testing/connection_controller_routines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/core/cmd"
"github.com/deso-protocol/core/collections"
"github.com/deso-protocol/core/lib"
"github.com/stretchr/testify/require"
"testing"
Expand Down Expand Up @@ -76,11 +77,11 @@ func TestConnectionControllerInitiatePersistentConnections(t *testing.T) {
waitForCountRemoteNodeIndexer(t, node3, 1, 1, 0, 0)
waitForCountRemoteNodeIndexer(t, node4, 2, 2, 0, 0)
waitForCountRemoteNodeIndexer(t, node5, 2, 2, 0, 0)
node6.Stop()
node2.Stop()
node3.Stop()
node4.Stop()
node5.Stop()
node6.Stop()
t.Logf("Test #2 passed | Successfully run validator node6 with --connect-ips set to node2, node3, node4, node5")
}

Expand Down Expand Up @@ -254,8 +255,8 @@ func TestConnectionControllerValidatorConnector(t *testing.T) {
func TestConnectionControllerNonValidatorConnector(t *testing.T) {
require := require.New(t)

// Spawn 4 non-validators node1, node2, node3, node4. Set node1's targetOutboundPeers to 3. Then make node1
// create outbound connections to node2, node3, and node4, as well as 2 attempted persistent connections to
// Spawn 6 non-validators node1, node2, node3, node4, node5, node6. Set node1's targetOutboundPeers to 3. Then make
// node1 create outbound connections to node2, node3, and node4, as well as 2 attempted persistent connections to
// non-existing ips.
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node1.Config.TargetOutboundPeers = 3
Expand Down Expand Up @@ -285,27 +286,41 @@ func TestConnectionControllerNonValidatorConnector(t *testing.T) {
waitForCountRemoteNodeIndexer(t, node1, 3, 0, 3, 0)
}

func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) {
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")

node1.Config.ConnectIPs = []string{"127.0.0.1:18001"}
node2.Config.ConnectIPs = []string{"127.0.0.1:18000"}

node1 = startNode(t, node1)
node2 = startNode(t, node2)
defer node1.Stop()
defer node2.Stop()

waitForCountRemoteNodeIndexer(t, node1, 2, 0, 1, 1)
waitForCountRemoteNodeIndexer(t, node2, 2, 0, 1, 1)
}

func setGetActiveValidatorImplWithValidatorNodes(t *testing.T, validators ...*cmd.Node) {
require := require.New(t)

var err error
mapping := make(map[bls.SerializedPublicKey]*lib.ValidatorEntry)
mapping := collections.NewConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry]()
for _, validator := range validators {
seed := validator.Config.PosValidatorSeed
if seed == "" {
t.Fatalf("Validator node %s does not have a PosValidatorSeed set", validator.Params.UserAgent)
}
blsPriv := &bls.PrivateKey{}
blsPriv, err = blsPriv.FromString(seed)
keystore, err := lib.NewBLSKeystore(seed)
require.NoError(err)
mapping[blsPriv.PublicKey().Serialize()] = createSimpleValidatorEntry(validator)
mapping.Set(keystore.GetSigner().GetPublicKey().Serialize(), createSimpleValidatorEntry(validator))
}
setGetActiveValidatorImpl(func() map[bls.SerializedPublicKey]*lib.ValidatorEntry {
setGetActiveValidatorImpl(func() *collections.ConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry] {
return mapping
})
}

func setGetActiveValidatorImpl(mapping func() map[bls.SerializedPublicKey]*lib.ValidatorEntry) {
func setGetActiveValidatorImpl(mapping func() *collections.ConcurrentMap[bls.SerializedPublicKey, *lib.ValidatorEntry]) {
lib.GetActiveValidatorImpl = mapping
}

Expand Down Expand Up @@ -348,11 +363,8 @@ func waitForMinNonValidatorCountRemoteNodeIndexer(t *testing.T, node *cmd.Node,
userAgent := node.Params.UserAgent
rnManager := node.Server.GetConnectionController().GetRemoteNodeManager()
condition := func() bool {
if true != checkRemoteNodeIndexerMinNonValidatorCount(rnManager, allCount, validatorCount,
minNonValidatorOutboundCount, minNonValidatorInboundCount) {
return false
}
return true
return checkRemoteNodeIndexerMinNonValidatorCount(rnManager, allCount, validatorCount,
minNonValidatorOutboundCount, minNonValidatorInboundCount)
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to have at least %d non-validator outbound nodes and %d non-validator inbound nodes",
userAgent, minNonValidatorOutboundCount, minNonValidatorInboundCount), condition)
Expand Down
5 changes: 4 additions & 1 deletion integration_testing/connection_controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/deso-protocol/core/lib"
"os"
"testing"
"time"
)

func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) {
Expand All @@ -26,7 +27,7 @@ func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node)
}
return true
}
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2)
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2)
}

func waitForNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) {
Expand Down Expand Up @@ -222,5 +223,7 @@ func spawnValidatorNodeProtocol2(t *testing.T, port uint32, id string, blsPriv *
node := cmd.NewNode(config)
node.Params.UserAgent = id
node.Params.ProtocolVersion = lib.ProtocolVersion2
node.Params.VersionNegotiationTimeout = 1 * time.Second
node.Params.VerackNegotiationTimeout = 1 * time.Second
return node
}
85 changes: 61 additions & 24 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/wire"
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/core/collections"
"github.com/golang/glog"
"github.com/pkg/errors"
"net"
Expand All @@ -13,12 +14,12 @@ import (
"time"
)

type GetActiveValidatorsFunc func() map[bls.SerializedPublicKey]*ValidatorEntry
type GetActiveValidatorsFunc func() *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]

var GetActiveValidatorImpl GetActiveValidatorsFunc = BasicGetActiveValidators

func BasicGetActiveValidators() map[bls.SerializedPublicKey]*ValidatorEntry {
return make(map[bls.SerializedPublicKey]*ValidatorEntry)
func BasicGetActiveValidators() *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry] {
return collections.NewConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]()
}

// ConnectionController is a structure that oversees all connections to remote nodes. It is responsible for kicking off
Expand Down Expand Up @@ -78,14 +79,15 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
}

func (cc *ConnectionController) Start() {
cc.startGroup.Add(2)
cc.startGroup.Add(3)
cc.initiatePersistentConnections()
// Start the validator connector
go cc.startValidatorConnector()
go cc.startNonValidatorConnector()
go cc.startRemoteNodeCleanup()

cc.startGroup.Wait()
cc.exitGroup.Add(2)
cc.exitGroup.Add(3)
}

func (cc *ConnectionController) Stop() {
Expand Down Expand Up @@ -151,6 +153,21 @@ func (cc *ConnectionController) startNonValidatorConnector() {
}
}

func (cc *ConnectionController) startRemoteNodeCleanup() {
cc.startGroup.Done()

for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
//cc.rnManager.Cleanup()
}
}

}

// ###########################
// ## Handlers (Peer, DeSoMessage)
// ###########################
Expand Down Expand Up @@ -199,7 +216,7 @@ func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMs
remoteNode, err = cc.processInboundConnection(msg.Connection)
if err != nil {
glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling inbound connection: %v", err)
msg.Connection.Close()
cc.cleanupFailedInboundConnection(remoteNode, msg.Connection)
return
}
case ConnectionTypeOutbound:
Expand All @@ -215,6 +232,13 @@ func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMs
cc.handshake.InitiateHandshake(remoteNode)
}

func (cc *ConnectionController) cleanupFailedInboundConnection(remoteNode *RemoteNode, connection Connection) {
if remoteNode != nil {
cc.rnManager.Disconnect(remoteNode)
}
connection.Close()
}

func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Connection) {
oc, ok := connection.(*outboundConnection)
if !ok {
Expand All @@ -226,6 +250,7 @@ func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Conne
if rn != nil {
cc.rnManager.Disconnect(rn)
}
oc.Close()
cc.cmgr.RemoveAttemptedOutboundAddrs(oc.address)
}

Expand All @@ -235,14 +260,14 @@ func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Conne

// refreshValidatorIndex re-indexes validators based on the activeValidatorsMap. It is called periodically by the
// validator connector.
func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) {
func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) {
// De-index inactive validators. We skip any checks regarding RemoteNodes connection status, nor do we verify whether
// de-indexing the validator would result in an excess number of outbound/inbound connections. Any excess connections
// will be cleaned up by the peer connector.
validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy()
for pk, rn := range validatorRemoteNodeMap {
// If the validator is no longer active, de-index it.
if _, ok := activeValidatorsMap[pk]; !ok {
if _, ok := activeValidatorsMap.Get(pk); !ok {
cc.rnManager.SetNonValidator(rn)
cc.rnManager.UnsetValidator(rn)
}
Expand All @@ -267,7 +292,7 @@ func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bl
}

// If the RemoteNode turns out to be in the validator set, index it.
if _, ok := activeValidatorsMap[pk.Serialize()]; ok {
if _, ok := activeValidatorsMap.Get(pk.Serialize()); ok {
cc.rnManager.SetValidator(rn)
cc.rnManager.UnsetNonValidator(rn)
}
Expand All @@ -276,13 +301,14 @@ func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bl

// connectValidators attempts to connect to all active validators that are not already connected. It is called
// periodically by the validator connector.
func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) {
func (cc *ConnectionController) connectValidators(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) {
// Look through the active validators and connect to any that we're not already connected to.
if cc.blsKeystore == nil {
return
}

for pk, validator := range activeValidatorsMap {
validators := activeValidatorsMap.Copy()
for pk, validator := range validators {
_, exists := cc.rnManager.GetValidatorIndex().Get(pk)
// If we're already connected to the validator, continue.
if exists {
Expand Down Expand Up @@ -523,30 +549,41 @@ func (cc *ConnectionController) processOutboundConnection(conn Connection) (*Rem
cc.AddrMgr.Good(oc.address)
}

// if this is a non-persistent outbound peer, and we already have enough outbound peers, then don't bother adding this one.
if !oc.isPersistent && cc.enoughNonValidatorOutboundConnections() {
return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Connected to maximum number of outbound "+
"peers (%d)", cc.targetOutboundPeers)
}

// If this is a non-persistent outbound peer and the group key overlaps with another peer we're already connected to then
// abort mission. We only connect to one peer per IP group in order to prevent Sybil attacks.
if !oc.isPersistent && cc.cmgr.IsFromRedundantOutboundIPAddress(oc.address) {
return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Rejecting OUTBOUND NON-PERSISTENT "+
"connection with redundant group key (%s).", addrmgr.GroupKey(oc.address))
}

na, err := cc.ConvertIPStringToNetAddress(oc.connection.RemoteAddr().String())
if err != nil {
return nil, errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling ipToNetAddr "+
"for addr: (%s)", oc.connection.RemoteAddr().String())
}

// Attach the connection before additional validation steps because it is already established.
remoteNode, err := cc.rnManager.AttachOutboundConnection(oc.connection, na, oc.attemptId, oc.isPersistent)
if remoteNode == nil || err != nil {
return nil, errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling rnManager.AttachOutboundConnection "+
"for addr: (%s)", oc.connection.RemoteAddr().String())
}

// If this is a persistent remote node or a validator, we don't need to do any extra connection validation.
if remoteNode.IsPersistent() || remoteNode.GetValidatorPublicKey() != nil {
return remoteNode, nil
}

// If we get here, it means we're dealing with a non-persistent or non-validator remote node. We perform additional
// connection validation.

// If we already have enough outbound peers, then don't bother adding this one.
if cc.enoughNonValidatorOutboundConnections() {
return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Connected to maximum number of outbound "+
"peers (%d)", cc.targetOutboundPeers)
}

// If the group key overlaps with another peer we're already connected to then abort mission. We only connect to
// one peer per IP group in order to prevent Sybil attacks.
if cc.cmgr.IsFromRedundantOutboundIPAddress(oc.address) {
return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Rejecting OUTBOUND NON-PERSISTENT "+
"connection with redundant group key (%s).", addrmgr.GroupKey(oc.address))
}
cc.cmgr.AddToGroupKey(na)

return remoteNode, nil
}

Expand Down
17 changes: 5 additions & 12 deletions lib/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func NewConnectionManager(
// Check if the address passed shares a group with any addresses already in our data structures.
func (cmgr *ConnectionManager) IsFromRedundantOutboundIPAddress(na *wire.NetAddress) bool {
groupKey := addrmgr.GroupKey(na)
// For the sake of running multiple nodes on the same machine, we allow localhost connections.
if groupKey == "local" {
return false
}

cmgr.mtxOutboundConnIPGroups.Lock()
numGroupsForKey := cmgr.outboundConnIPGroups[groupKey]
Expand All @@ -185,7 +189,7 @@ func (cmgr *ConnectionManager) IsFromRedundantOutboundIPAddress(na *wire.NetAddr
return true
}

func (cmgr *ConnectionManager) addToGroupKey(na *wire.NetAddress) {
func (cmgr *ConnectionManager) AddToGroupKey(na *wire.NetAddress) {
groupKey := addrmgr.GroupKey(na)

cmgr.mtxOutboundConnIPGroups.Lock()
Expand Down Expand Up @@ -429,7 +433,6 @@ func (cmgr *ConnectionManager) addPeer(pp *Peer) {
// number of outbound peers. Also add the peer's address to
// our map.
if _, ok := peerList[pp.ID]; !ok {
cmgr.addToGroupKey(pp.netAddr)
atomic.AddUint32(&cmgr.numOutboundPeers, 1)

cmgr.mtxAddrsMaps.Lock()
Expand Down Expand Up @@ -528,16 +531,6 @@ func (cmgr *ConnectionManager) _logOutboundPeerData() {
numInboundPeers := int(atomic.LoadUint32(&cmgr.numInboundPeers))
numPersistentPeers := int(atomic.LoadUint32(&cmgr.numPersistentPeers))
glog.V(1).Infof("Num peers: OUTBOUND(%d) INBOUND(%d) PERSISTENT(%d)", numOutboundPeers, numInboundPeers, numPersistentPeers)

cmgr.mtxOutboundConnIPGroups.Lock()
for _, vv := range cmgr.outboundConnIPGroups {
if vv != 0 && vv != 1 {
glog.V(1).Infof("_logOutboundPeerData: Peer group count != (0 or 1). "+
"Is (%d) instead. This "+
"should never happen.", vv)
}
}
cmgr.mtxOutboundConnIPGroups.Unlock()
}

func (cmgr *ConnectionManager) AddTimeSample(addrStr string, timeSample time.Time) {
Expand Down
8 changes: 6 additions & 2 deletions lib/network_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func (oc *outboundConnection) Close() {
if oc.terminated {
return
}
oc.connection.Close()
if oc.connection != nil {
oc.connection.Close()
}
oc.terminated = true
}

Expand All @@ -58,7 +60,9 @@ func (ic *inboundConnection) Close() {
return
}

ic.connection.Close()
if ic.connection != nil {
ic.connection.Close()
}
ic.terminated = true
}

Expand Down
Loading

0 comments on commit f1fe09a

Please sign in to comment.