Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoS Validator Management #793

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 32 additions & 250 deletions lib/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package lib
import (
"fmt"
"net"
"strconv"
"sync/atomic"
"time"

Expand All @@ -13,7 +12,6 @@ import (
"github.com/decred/dcrd/lru"
"github.com/deso-protocol/go-deadlock"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// connection_manager.go contains most of the logic for creating and managing
Expand All @@ -35,24 +33,10 @@ type ConnectionManager struct {
// doesn't need a reference to the Server object. But for now we keep things lazy.
srv *Server

// When --connectips is set, we don't connect to anything from the addrmgr.
connectIps []string

// The address manager keeps track of peer addresses we're aware of. When
// we need to connect to a new outbound peer, it chooses one of the addresses
// it's aware of at random and provides it to us.
AddrMgr *addrmgr.AddrManager
// The interfaces we listen on for new incoming connections.
listeners []net.Listener
// The parameters we are initialized with.
params *DeSoParams
// The target number of outbound peers we want to have.
targetOutboundPeers uint32
// The maximum number of inbound peers we allow.
maxInboundPeers uint32
// When true, only one connection per IP is allowed. Prevents eclipse attacks
// among other things.
limitOneInboundConnectionPerIP bool

// When --hypersync is set to true we will attempt fast block synchronization
HyperSync bool
Expand Down Expand Up @@ -135,10 +119,8 @@ type ConnectionManager struct {
}

func NewConnectionManager(
_params *DeSoParams, _addrMgr *addrmgr.AddrManager, _listeners []net.Listener,
_params *DeSoParams, _listeners []net.Listener,
_connectIps []string, _timeSource chainlib.MedianTimeSource,
_targetOutboundPeers uint32, _maxInboundPeers uint32,
_limitOneInboundConnectionPerIP bool,
_hyperSync bool,
_syncType NodeSyncType,
_stallTimeoutSeconds uint64,
Expand All @@ -149,16 +131,13 @@ func NewConnectionManager(
ValidateHyperSyncFlags(_hyperSync, _syncType)

return &ConnectionManager{
srv: _srv,
params: _params,
AddrMgr: _addrMgr,
listeners: _listeners,
connectIps: _connectIps,
srv: _srv,
params: _params,
listeners: _listeners,
// We keep track of the last N nonces we've sent in order to detect
// self connections.
sentNonces: lru.NewCache(1000),
timeSource: _timeSource,

//newestBlock: _newestBlock,

// Initialize the peer data structures.
Expand All @@ -176,25 +155,14 @@ func NewConnectionManager(
donePeerChan: make(chan *Peer, 100),
outboundConnectionChan: make(chan *outboundConnection, 100),

targetOutboundPeers: _targetOutboundPeers,
maxInboundPeers: _maxInboundPeers,
limitOneInboundConnectionPerIP: _limitOneInboundConnectionPerIP,
HyperSync: _hyperSync,
SyncType: _syncType,
serverMessageQueue: _serverMessageQueue,
stallTimeoutSeconds: _stallTimeoutSeconds,
minFeeRateNanosPerKB: _minFeeRateNanosPerKB,
HyperSync: _hyperSync,
SyncType: _syncType,
serverMessageQueue: _serverMessageQueue,
stallTimeoutSeconds: _stallTimeoutSeconds,
minFeeRateNanosPerKB: _minFeeRateNanosPerKB,
}
}

func (cmgr *ConnectionManager) GetAddrManager() *addrmgr.AddrManager {
return cmgr.AddrMgr
}

func (cmgr *ConnectionManager) SetTargetOutboundPeers(numPeers uint32) {
cmgr.targetOutboundPeers = numPeers
}

// Check if the address passed shares a group with any addresses already in our
// data structures.
func (cmgr *ConnectionManager) isRedundantGroupKey(na *wire.NetAddress) bool {
Expand Down Expand Up @@ -232,76 +200,6 @@ func (cmgr *ConnectionManager) subFromGroupKey(na *wire.NetAddress) {
cmgr.mtxOutboundConnIPGroups.Unlock()
}

func (cmgr *ConnectionManager) getRandomAddr() *wire.NetAddress {
for tries := 0; tries < 100; tries++ {
addr := cmgr.AddrMgr.GetAddress()
if addr == nil {
glog.V(2).Infof("ConnectionManager.getRandomAddr: addr from GetAddressWithExclusions was nil")
break
}

// Lock the address map since multiple threads will be trying to read
// and modify it at the same time.
cmgr.mtxConnectedOutboundAddrs.RLock()
ok := cmgr.connectedOutboundAddrs[addrmgr.NetAddressKey(addr.NetAddress())]
cmgr.mtxConnectedOutboundAddrs.RUnlock()
if ok {
glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing already connected address %v:%v", addr.NetAddress().IP, addr.NetAddress().Port)
continue
}

// We can only have one outbound address per /16. This is similar to
// Bitcoin and we do it to prevent Sybil attacks.
if cmgr.isRedundantGroupKey(addr.NetAddress()) {
glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port)
continue
}

glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning %v:%v at %d iterations",
addr.NetAddress().IP, addr.NetAddress().Port, tries)
return addr.NetAddress()
}

glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil")
return nil
}

func (cmgr *ConnectionManager) enoughOutboundPeers() bool {
val := atomic.LoadUint32(&cmgr.numOutboundPeers)
if val > cmgr.targetOutboundPeers {
glog.Errorf("enoughOutboundPeers: Connected to too many outbound "+
"peers: (%d). Should be "+
"no more than (%d).", val, cmgr.targetOutboundPeers)
return true
}

if val == cmgr.targetOutboundPeers {
return true
}
return false
}

func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) {
port := params.DefaultSocketPort
host, portstr, err := net.SplitHostPort(ipStr)
if err != nil {
// No port specified so leave port=default and set
// host to the ipStr.
host = ipStr
} else {
pp, err := strconv.ParseUint(portstr, 10, 16)
if err != nil {
return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr)
}
port = uint16(pp)
}
netAddr, err := addrMgr.HostToNetAddress(host, port, 0)
if err != nil {
return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr)
}
return netAddr, nil
}

func (cmgr *ConnectionManager) IsConnectedOutboundIpAddress(netAddr *wire.NetAddress) bool {
// Lock the address map since multiple threads will be trying to read
// and modify it at the same time.
Expand Down Expand Up @@ -353,15 +251,8 @@ func (cmgr *ConnectionManager) _createOutboundConnection(addr *wire.NetAddress,
// is set, then we will connect only to that addr. Otherwise, we will use
// the addrmgr to randomly select addrs and create OUTBOUND connections
// with them until we find a worthy peer.
func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, attemptId uint64, isOutbound bool, isPersistent bool) error {
// At this point Conn is set so create a peer object to do
// a version negotiation.
ipStr := conn.RemoteAddr().String()
na, err := IPToNetAddr(ipStr, cmgr.AddrMgr, cmgr.params)
if err != nil {
return errors.Wrapf(err, "ConnectOutboundConnection: Problem calling ipToNetAddr for addr: (%s)", conn.RemoteAddr().String())
}

func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, na *wire.NetAddress, attemptId uint64, isOutbound bool, isPersistent bool) *Peer {
// At this point Conn is set so create a peer object to do a version negotiation.
id := atomic.AddUint64(&cmgr.peerIndex, 1)
peer := NewPeer(id, attemptId, conn, isOutbound, na, isPersistent,
cmgr.stallTimeoutSeconds,
Expand All @@ -370,34 +261,33 @@ func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, attemptId uint64, isOu
cmgr.srv.incomingMessages, cmgr, cmgr.srv, cmgr.SyncType,
cmgr.newPeerChan, cmgr.donePeerChan)

cmgr.newPeerChan <- peer
return nil
// Now we can add the peer to our data structures.
peer._logAddPeer()
cmgr.addPeer(peer)

// Start the peer's message loop.
peer.Start()

// FIXME: Move this earlier
// Signal the server about the new Peer in case it wants to do something with it.
go func() {
cmgr.serverMessageQueue <- &ServerMessage{
Peer: peer,
Msg: &MsgDeSoNewPeer{},
}
}()

return peer
}

func (cmgr *ConnectionManager) _isFromRedundantInboundIPAddress(addrToCheck net.Addr) bool {
func (cmgr *ConnectionManager) _isFromRedundantInboundIPAddress(netAddr *wire.NetAddress) bool {
cmgr.mtxPeerMaps.RLock()
defer cmgr.mtxPeerMaps.RUnlock()

// Loop through all the peers to see if any have the same IP
// address. This map is normally pretty small so doing this
// every time a Peer connects should be fine.
netAddr, err := IPToNetAddr(addrToCheck.String(), cmgr.AddrMgr, cmgr.params)
if err != nil {
// Return true in case we have an error. We do this because it
// will result in the peer connection not being accepted, which
// is desired in this case.
glog.Warningf(errors.Wrapf(err,
"ConnectionManager._isFromRedundantInboundIPAddress: Problem parsing "+
"net.Addr to wire.NetAddress so marking as redundant and not "+
"making connection").Error())
return true
}
if netAddr == nil {
glog.Warningf("ConnectionManager._isFromRedundantInboundIPAddress: " +
"address was nil after parsing so marking as redundant and not " +
"making connection")
return true
}

// If the IP is a localhost IP let it slide. This is useful for testing fake
// nodes on a local machine.
// TODO: Should this be a flag?
Expand Down Expand Up @@ -437,37 +327,6 @@ func (cmgr *ConnectionManager) _handleInboundConnections() {
continue
}

// As a quick check, reject the peer if we have too many already. Note that
// this check isn't perfect but we have a later check at the end after doing
// a version negotiation that will properly reject the peer if this check
// messes up e.g. due to a concurrency issue.
//
// TODO: We should instead have eviction logic here to prevent
// someone from monopolizing a node's inbound connections.
numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers)
if numInboundPeers > cmgr.maxInboundPeers {

glog.Infof("Rejecting INBOUND peer (%s) due to max inbound peers (%d) hit.",
conn.RemoteAddr().String(), cmgr.maxInboundPeers)
conn.Close()

continue
}

// If we want to limit inbound connections to one per IP address, check to
// make sure this address isn't already connected.
if cmgr.limitOneInboundConnectionPerIP &&
cmgr._isFromRedundantInboundIPAddress(conn.RemoteAddr()) {

glog.Infof("Rejecting INBOUND peer (%s) due to already having an "+
"inbound connection from the same IP with "+
"limit_one_inbound_connection_per_ip set.",
conn.RemoteAddr().String())
conn.Close()

continue
}

cmgr.inboundConnectionChan <- &inboundConnection{
connection: conn,
}
Expand Down Expand Up @@ -621,28 +480,14 @@ func (cmgr *ConnectionManager) removePeer(pp *Peer) {
// Update the last seen time before we finish removing the peer.
// TODO: Really, we call 'Connected()' on removing a peer?
// I can't find a Disconnected() but seems odd.
cmgr.AddrMgr.Connected(pp.netAddr)
// FIXME: Move this to Done Peer
//cmgr.AddrMgr.Connected(pp.netAddr)

// Remove the peer from our data structure.
delete(peerList, pp.ID)
delete(cmgr.connectedPeers, pp.ID)
}

func (cmgr *ConnectionManager) _maybeReplacePeer(pp *Peer) {
// If the peer was outbound, replace her with a
// new peer to maintain a fixed number of outbound connections.
if pp.isOutbound {
// If the peer is not persistent then we don't want to pass an
// address to connectPeer. The lack of an address will cause it
// to choose random addresses from the addrmgr until one works.
na := pp.netAddr
if !pp.isPersistent {
na = nil
}
cmgr._createOutboundConnection(na, pp.isPersistent)
}
}

func (cmgr *ConnectionManager) _logOutboundPeerData() {
numOutboundPeers := int(atomic.LoadUint32(&cmgr.numOutboundPeers))
numInboundPeers := int(atomic.LoadUint32(&cmgr.numInboundPeers))
Expand Down Expand Up @@ -720,68 +565,6 @@ func (cmgr *ConnectionManager) Start() {
Connection: ic,
},
}
case pp := <-cmgr.newPeerChan:
{
// We have successfully connected to a peer and it passed its version
// negotiation.

// if this is a non-persistent outbound peer and we already have enough
// outbound peers, then don't bother adding this one.
if !pp.isPersistent && pp.isOutbound && cmgr.enoughOutboundPeers() {
// TODO: Make this less verbose
glog.V(1).Infof("Dropping peer because we already have enough outbound peer connections.")
pp.Conn.Close()
continue
}

// If this is a non-persistent outbound peer and the group key
// overlaps with another peer we're already connected to then
// abort mission. We only connect to one peer per IP group in
// order to prevent Sybil attacks.
if pp.isOutbound &&
!pp.isPersistent &&
cmgr.isRedundantGroupKey(pp.netAddr) {

// TODO: Make this less verbose
glog.Infof("Rejecting OUTBOUND NON-PERSISTENT peer (%v) with "+
"redundant group key (%s).",
pp, addrmgr.GroupKey(pp.netAddr))

pp.Conn.Close()
cmgr._maybeReplacePeer(pp)
continue
}

// Check that we have not exceeded the maximum number of inbound
// peers allowed.
//
// TODO: We should instead have eviction logic to prevent
// someone from monopolizing a node's inbound connections.
numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers)
if !pp.isOutbound && numInboundPeers > cmgr.maxInboundPeers {

// TODO: Make this less verbose
glog.Infof("Rejecting INBOUND peer (%v) due to max inbound peers (%d) hit.",
pp, cmgr.maxInboundPeers)

pp.Conn.Close()
continue
}

// Now we can add the peer to our data structures.
pp._logAddPeer()
cmgr.addPeer(pp)

// Start the peer's message loop.
pp.Start()

// Signal the server about the new Peer in case it wants to do something with it.
cmgr.serverMessageQueue <- &ServerMessage{
Peer: pp,
Msg: &MsgDeSoNewPeer{},
}

}
case pp := <-cmgr.donePeerChan:
{
// By the time we get here, it can be assumed that the Peer's Disconnect function
Expand All @@ -795,7 +578,6 @@ func (cmgr *ConnectionManager) Start() {

// Potentially replace the peer. For example, if the Peer was an outbound Peer
// then we want to find a new peer in order to maintain our TargetOutboundPeers.
cmgr._maybeReplacePeer(pp)

// Signal the server about the Peer being done in case it wants to do something
// with it.
Expand Down
Loading