Skip to content

Commit

Permalink
Add ConnectionController
Browse files Browse the repository at this point in the history
  • Loading branch information
AeonSw4n committed Dec 19, 2023
1 parent fa1f834 commit 0f1ea46
Show file tree
Hide file tree
Showing 5 changed files with 768 additions and 441 deletions.
252 changes: 13 additions & 239 deletions lib/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"math"
"net"
"strconv"
"sync/atomic"
"time"

Expand All @@ -14,7 +13,6 @@ import (
"github.com/decred/dcrd/lru"
"github.com/deso-protocol/go-deadlock"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// connection_manager.go contains most of the logic for creating and managing
Expand All @@ -36,24 +34,10 @@ type ConnectionManager struct {
// doesn't need a reference to the Server object. But for now we keep things lazy.
srv *Server

// When --connectips is set, we don't connect to anything from the addrmgr.
connectIps []string

// The address manager keeps track of peer addresses we're aware of. When
// we need to connect to a new outbound peer, it chooses one of the addresses
// it's aware of at random and provides it to us.
AddrMgr *addrmgr.AddrManager
// The interfaces we listen on for new incoming connections.
listeners []net.Listener
// The parameters we are initialized with.
params *DeSoParams
// The target number of outbound peers we want to have.
targetOutboundPeers uint32
// The maximum number of inbound peers we allow.
maxInboundPeers uint32
// When true, only one connection per IP is allowed. Prevents eclipse attacks
// among other things.
limitOneInboundConnectionPerIP bool

// When --hypersync is set to true we will attempt fast block synchronization
HyperSync bool
Expand Down Expand Up @@ -139,10 +123,8 @@ type ConnectionManager struct {
}

func NewConnectionManager(
_params *DeSoParams, _addrMgr *addrmgr.AddrManager, _listeners []net.Listener,
_params *DeSoParams, _listeners []net.Listener,
_connectIps []string, _timeSource chainlib.MedianTimeSource,
_targetOutboundPeers uint32, _maxInboundPeers uint32,
_limitOneInboundConnectionPerIP bool,
_hyperSync bool,
_syncType NodeSyncType,
_stallTimeoutSeconds uint64,
Expand All @@ -153,16 +135,13 @@ func NewConnectionManager(
ValidateHyperSyncFlags(_hyperSync, _syncType)

return &ConnectionManager{
srv: _srv,
params: _params,
AddrMgr: _addrMgr,
listeners: _listeners,
connectIps: _connectIps,
srv: _srv,
params: _params,
listeners: _listeners,
// We keep track of the last N nonces we've sent in order to detect
// self connections.
sentNonces: lru.NewCache(1000),
timeSource: _timeSource,

//newestBlock: _newestBlock,

// Initialize the peer data structures.
Expand All @@ -180,25 +159,14 @@ func NewConnectionManager(
donePeerChan: make(chan *Peer, 100),
outboundConnectionChan: make(chan *outboundConnection, 100),

targetOutboundPeers: _targetOutboundPeers,
maxInboundPeers: _maxInboundPeers,
limitOneInboundConnectionPerIP: _limitOneInboundConnectionPerIP,
HyperSync: _hyperSync,
SyncType: _syncType,
serverMessageQueue: _serverMessageQueue,
stallTimeoutSeconds: _stallTimeoutSeconds,
minFeeRateNanosPerKB: _minFeeRateNanosPerKB,
HyperSync: _hyperSync,
SyncType: _syncType,
serverMessageQueue: _serverMessageQueue,
stallTimeoutSeconds: _stallTimeoutSeconds,
minFeeRateNanosPerKB: _minFeeRateNanosPerKB,
}
}

func (cmgr *ConnectionManager) GetAddrManager() *addrmgr.AddrManager {
return cmgr.AddrMgr
}

func (cmgr *ConnectionManager) SetTargetOutboundPeers(numPeers uint32) {
cmgr.targetOutboundPeers = numPeers
}

// Check if the address passed shares a group with any addresses already in our
// data structures.
func (cmgr *ConnectionManager) IsFromRedundantOutboundIPAddress(na *wire.NetAddress) bool {
Expand Down Expand Up @@ -236,40 +204,6 @@ func (cmgr *ConnectionManager) subFromGroupKey(na *wire.NetAddress) {
cmgr.mtxOutboundConnIPGroups.Unlock()
}

func (cmgr *ConnectionManager) getRandomAddr() *wire.NetAddress {
for tries := 0; tries < 100; tries++ {
addr := cmgr.AddrMgr.GetAddress()
if addr == nil {
glog.V(2).Infof("ConnectionManager.getRandomAddr: addr from GetAddressWithExclusions was nil")
break
}

// Lock the address map since multiple threads will be trying to read
// and modify it at the same time.
cmgr.mtxConnectedOutboundAddrs.RLock()
ok := cmgr.connectedOutboundAddrs[addrmgr.NetAddressKey(addr.NetAddress())]
cmgr.mtxConnectedOutboundAddrs.RUnlock()
if ok {
glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing already connected address %v:%v", addr.NetAddress().IP, addr.NetAddress().Port)
continue
}

// We can only have one outbound address per /16. This is similar to
// Bitcoin and we do it to prevent Sybil attacks.
if cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) {
glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port)
continue
}

glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning %v:%v at %d iterations",
addr.NetAddress().IP, addr.NetAddress().Port, tries)
return addr.NetAddress()
}

glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil")
return nil
}

func _delayRetry(retryCount uint64, persistentAddrForLogging *wire.NetAddress, unit time.Duration) (_retryDuration time.Duration) {
// No delay if we haven't tried yet or if the number of retries isn't positive.
if retryCount <= 0 {
Expand All @@ -288,42 +222,6 @@ func _delayRetry(retryCount uint64, persistentAddrForLogging *wire.NetAddress, u
return retryDelay
}

func (cmgr *ConnectionManager) enoughOutboundPeers() bool {
val := atomic.LoadUint32(&cmgr.numOutboundPeers)
if val > cmgr.targetOutboundPeers {
glog.Errorf("enoughOutboundPeers: Connected to too many outbound "+
"peers: (%d). Should be "+
"no more than (%d).", val, cmgr.targetOutboundPeers)
return true
}

if val == cmgr.targetOutboundPeers {
return true
}
return false
}

func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) {
port := params.DefaultSocketPort
host, portstr, err := net.SplitHostPort(ipStr)
if err != nil {
// No port specified so leave port=default and set
// host to the ipStr.
host = ipStr
} else {
pp, err := strconv.ParseUint(portstr, 10, 16)
if err != nil {
return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr)
}
port = uint16(pp)
}
netAddr, err := addrMgr.HostToNetAddress(host, port, 0)
if err != nil {
return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr)
}
return netAddr, nil
}

func (cmgr *ConnectionManager) IsConnectedOutboundIpAddress(netAddr *wire.NetAddress) bool {
// Lock the address map since multiple threads will be trying to read
// and modify it at the same time.
Expand Down Expand Up @@ -412,30 +310,14 @@ func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, na *wire.NetAddress, a
return peer
}

func (cmgr *ConnectionManager) _isFromRedundantInboundIPAddress(addrToCheck net.Addr) bool {
func (cmgr *ConnectionManager) IsFromRedundantInboundIPAddress(netAddr *wire.NetAddress) bool {
cmgr.mtxPeerMaps.RLock()
defer cmgr.mtxPeerMaps.RUnlock()

// Loop through all the peers to see if any have the same IP
// address. This map is normally pretty small so doing this
// every time a Peer connects should be fine.
netAddr, err := IPToNetAddr(addrToCheck.String(), cmgr.AddrMgr, cmgr.params)
if err != nil {
// Return true in case we have an error. We do this because it
// will result in the peer connection not being accepted, which
// is desired in this case.
glog.Warningf(errors.Wrapf(err,
"ConnectionManager._isFromRedundantInboundIPAddress: Problem parsing "+
"net.Addr to wire.NetAddress so marking as redundant and not "+
"making connection").Error())
return true
}
if netAddr == nil {
glog.Warningf("ConnectionManager._isFromRedundantInboundIPAddress: " +
"address was nil after parsing so marking as redundant and not " +
"making connection")
return true
}

// If the IP is a localhost IP let it slide. This is useful for testing fake
// nodes on a local machine.
// TODO: Should this be a flag?
Expand Down Expand Up @@ -475,37 +357,6 @@ func (cmgr *ConnectionManager) _handleInboundConnections() {
continue
}

// As a quick check, reject the peer if we have too many already. Note that
// this check isn't perfect but we have a later check at the end after doing
// a version negotiation that will properly reject the peer if this check
// messes up e.g. due to a concurrency issue.
//
// TODO: We should instead have eviction logic here to prevent
// someone from monopolizing a node's inbound connections.
numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers)
if numInboundPeers > cmgr.maxInboundPeers {

glog.Infof("Rejecting INBOUND peer (%s) due to max inbound peers (%d) hit.",
conn.RemoteAddr().String(), cmgr.maxInboundPeers)
conn.Close()

continue
}

// If we want to limit inbound connections to one per IP address, check to
// make sure this address isn't already connected.
if cmgr.limitOneInboundConnectionPerIP &&
cmgr._isFromRedundantInboundIPAddress(conn.RemoteAddr()) {

glog.Infof("Rejecting INBOUND peer (%s) due to already having an "+
"inbound connection from the same IP with "+
"limit_one_inbound_connection_per_ip set.",
conn.RemoteAddr().String())
conn.Close()

continue
}

cmgr.inboundConnectionChan <- &inboundConnection{
connection: conn,
}
Expand Down Expand Up @@ -661,28 +512,14 @@ func (cmgr *ConnectionManager) removePeer(pp *Peer) {
// Update the last seen time before we finish removing the peer.
// TODO: Really, we call 'Connected()' on removing a peer?
// I can't find a Disconnected() but seems odd.
cmgr.AddrMgr.Connected(pp.netAddr)
// FIXME: Move this to Done Peer
//cmgr.AddrMgr.Connected(pp.netAddr)

// Remove the peer from our data structure.
delete(peerList, pp.ID)
delete(cmgr.connectedPeers, pp.ID)
}

func (cmgr *ConnectionManager) _maybeReplacePeer(pp *Peer) {
// If the peer was outbound, replace her with a
// new peer to maintain a fixed number of outbound connections.
if pp.isOutbound {
// If the peer is not persistent then we don't want to pass an
// address to connectPeer. The lack of an address will cause it
// to choose random addresses from the addrmgr until one works.
na := pp.netAddr
if !pp.isPersistent {
na = nil
}
cmgr._dialOutboundConnection(na, pp.isPersistent)
}
}

func (cmgr *ConnectionManager) _logOutboundPeerData() {
numOutboundPeers := int(atomic.LoadUint32(&cmgr.numOutboundPeers))
numInboundPeers := int(atomic.LoadUint32(&cmgr.numInboundPeers))
Expand Down Expand Up @@ -791,68 +628,6 @@ func (cmgr *ConnectionManager) Start() {
Connection: ic,
},
}
case pp := <-cmgr.newPeerChan:
{
// We have successfully connected to a peer and it passed its version
// negotiation.

// if this is a non-persistent outbound peer and we already have enough
// outbound peers, then don't bother adding this one.
if !pp.isPersistent && pp.isOutbound && cmgr.enoughOutboundPeers() {
// TODO: Make this less verbose
glog.V(1).Infof("Dropping peer because we already have enough outbound peer connections.")
pp.Conn.Close()
continue
}

// If this is a non-persistent outbound peer and the group key
// overlaps with another peer we're already connected to then
// abort mission. We only connect to one peer per IP group in
// order to prevent Sybil attacks.
if pp.isOutbound &&
!pp.isPersistent &&
cmgr.IsFromRedundantOutboundIPAddress(pp.netAddr) {

// TODO: Make this less verbose
glog.Infof("Rejecting OUTBOUND NON-PERSISTENT peer (%v) with "+
"redundant group key (%s).",
pp, addrmgr.GroupKey(pp.netAddr))

pp.Conn.Close()
cmgr._maybeReplacePeer(pp)
continue
}

// Check that we have not exceeded the maximum number of inbound
// peers allowed.
//
// TODO: We should instead have eviction logic to prevent
// someone from monopolizing a node's inbound connections.
numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers)
if !pp.isOutbound && numInboundPeers > cmgr.maxInboundPeers {

// TODO: Make this less verbose
glog.Infof("Rejecting INBOUND peer (%v) due to max inbound peers (%d) hit.",
pp, cmgr.maxInboundPeers)

pp.Conn.Close()
continue
}

// Now we can add the peer to our data structures.
pp._logAddPeer()
cmgr.addPeer(pp)

// Start the peer's message loop.
pp.Start()

// Signal the server about the new Peer in case it wants to do something with it.
cmgr.serverMessageQueue <- &ServerMessage{
Peer: pp,
Msg: &MsgDeSoNewPeer{},
}

}
case pp := <-cmgr.donePeerChan:
{
// By the time we get here, it can be assumed that the Peer's Disconnect function
Expand All @@ -866,7 +641,6 @@ func (cmgr *ConnectionManager) Start() {

// Potentially replace the peer. For example, if the Peer was an outbound Peer
// then we want to find a new peer in order to maintain our TargetOutboundPeers.
cmgr._maybeReplacePeer(pp)

// Signal the server about the Peer being done in case it wants to do something
// with it.
Expand Down
7 changes: 7 additions & 0 deletions lib/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ type DeSoParams struct {
// The amount of time we wait to receive a version message from a peer.
VersionNegotiationTimeout time.Duration

// The maximum number of addresses to broadcast to peers.
MaxAddressesToBroadcast uint32

// The genesis block to use as the base of our chain.
GenesisBlock *MsgDeSoBlock
// The expected hash of the genesis block. Should align with what one
Expand Down Expand Up @@ -1019,6 +1022,8 @@ var DeSoMainnetParams = DeSoParams{
DialTimeout: 30 * time.Second,
VersionNegotiationTimeout: 30 * time.Second,

MaxAddressesToBroadcast: 10,

BlockRewardMaturity: time.Hour * 3,

V1DifficultyAdjustmentFactor: 10,
Expand Down Expand Up @@ -1288,6 +1293,8 @@ var DeSoTestnetParams = DeSoParams{
DialTimeout: 30 * time.Second,
VersionNegotiationTimeout: 30 * time.Second,

MaxAddressesToBroadcast: 10,

GenesisBlock: &GenesisBlock,
GenesisBlockHashHex: GenesisBlockHashHex,

Expand Down
Loading

0 comments on commit 0f1ea46

Please sign in to comment.