From 575cbd413f9d969fc7b86ded1a0a30936d8dd070 Mon Sep 17 00:00:00 2001 From: Piotr Nojszewski <29924594+AeonSw4n@users.noreply.github.com> Date: Thu, 7 Dec 2023 05:21:42 -0800 Subject: [PATCH] Add ConnectionController --- integration_testing/connection_bridge.go | 85 ++- lib/connection_manager.go | 252 +------- lib/constants.go | 7 + lib/peer.go | 5 +- lib/pos_connection_controller.go | 723 +++++++++++++++++++++++ lib/server.go | 222 +------ 6 files changed, 850 insertions(+), 444 deletions(-) create mode 100644 lib/pos_connection_controller.go diff --git a/integration_testing/connection_bridge.go b/integration_testing/connection_bridge.go index 8ac55479c..d15141368 100644 --- a/integration_testing/connection_bridge.go +++ b/integration_testing/connection_bridge.go @@ -142,13 +142,12 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), otherNode.Server.GetConnectionManager().AddrMgr, otherNode.Params) - messagesFromPeer := make(chan *lib.ServerMessage) + messagesFromPeer := make(chan *lib.ServerMessage, 100) newPeerChan := make(chan *lib.Peer, 100) donePeerChan := make(chan *lib.Peer, 100) peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), uint64(lib.RandInt64(math.MaxInt64)), conn, false, na, false, 10000, 0, bridge.nodeB.Params, messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan) - peer.ID = uint64(lib.RandInt64(math.MaxInt64)) bridge.newPeerChan <- peer //} }(ll) @@ -183,10 +182,90 @@ func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVe return ver } +func ReadWithTimeout(readFunc func() error, readTimeout time.Duration) error { + errChan := make(chan error) + go func() { + errChan <- readFunc() + }() + select { + case err := <-errChan: + { + return err + } + case <-time.After(readTimeout): + { + return fmt.Errorf("ReadWithTimeout: Timed out reading message") + } + } +} + // startConnection starts the connection by performing version and verack exchange with // the provided connection, pretending to be the otherNode. func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode *cmd.Node) error { - // TODO: Update this + // Prepare the version message. + versionMessage := bridge.getVersionMessage(otherNode) + connection.VersionNonceSent = versionMessage.Nonce + + // Send the version message. + fmt.Println("Sending version message:", versionMessage, versionMessage.StartBlockHeight) + if err := connection.WriteDeSoMessage(versionMessage); err != nil { + return err + } + + // Wait for a response to the version message. + if err := ReadWithTimeout( + func() error { + msg, err := connection.ReadDeSoMessage() + if err != nil { + return err + } + + verMsg, ok := msg.(*lib.MsgDeSoVersion) + if !ok { + return err + } + + connection.VersionNonceReceived = verMsg.Nonce + connection.TimeConnected = time.Unix(verMsg.TstampSecs, 0) + connection.TimeOffsetSecs = verMsg.TstampSecs - time.Now().Unix() + return nil + }, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil { + + return err + } + + // Now prepare the verack message. + verackMsg := lib.NewMessage(lib.MsgTypeVerack) + verackMsg.(*lib.MsgDeSoVerack).NonceReceived = connection.VersionNonceReceived + + // And send it to the connection. + if err := connection.WriteDeSoMessage(verackMsg); err != nil { + return err + } + + // And finally wait for connection's response to the verack message. + if err := ReadWithTimeout( + func() error { + msg, err := connection.ReadDeSoMessage() + if err != nil { + return err + } + + if msg.GetMsgType() != lib.MsgTypeVerack { + return fmt.Errorf("message is not verack! Type: %v", msg.GetMsgType()) + } + verackMsg := msg.(*lib.MsgDeSoVerack) + if verackMsg.NonceReceived != connection.VersionNonceSent { + return fmt.Errorf("verack message nonce doesn't match (received: %v, sent: %v)", + verackMsg.NonceReceived, connection.VersionNonceSent) + } + return nil + }, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil { + + return err + } + connection.VersionNegotiated = true + return nil } diff --git a/lib/connection_manager.go b/lib/connection_manager.go index 932bef087..721ec6b2f 100644 --- a/lib/connection_manager.go +++ b/lib/connection_manager.go @@ -4,7 +4,6 @@ import ( "fmt" "math" "net" - "strconv" "sync/atomic" "time" @@ -14,7 +13,6 @@ import ( "github.com/decred/dcrd/lru" "github.com/deso-protocol/go-deadlock" "github.com/golang/glog" - "github.com/pkg/errors" ) // connection_manager.go contains most of the logic for creating and managing @@ -36,24 +34,10 @@ type ConnectionManager struct { // doesn't need a reference to the Server object. But for now we keep things lazy. srv *Server - // When --connectips is set, we don't connect to anything from the addrmgr. - connectIps []string - - // The address manager keeps track of peer addresses we're aware of. When - // we need to connect to a new outbound peer, it chooses one of the addresses - // it's aware of at random and provides it to us. - AddrMgr *addrmgr.AddrManager // The interfaces we listen on for new incoming connections. listeners []net.Listener // The parameters we are initialized with. params *DeSoParams - // The target number of outbound peers we want to have. - targetOutboundPeers uint32 - // The maximum number of inbound peers we allow. - maxInboundPeers uint32 - // When true, only one connection per IP is allowed. Prevents eclipse attacks - // among other things. - limitOneInboundConnectionPerIP bool // When --hypersync is set to true we will attempt fast block synchronization HyperSync bool @@ -139,10 +123,8 @@ type ConnectionManager struct { } func NewConnectionManager( - _params *DeSoParams, _addrMgr *addrmgr.AddrManager, _listeners []net.Listener, + _params *DeSoParams, _listeners []net.Listener, _connectIps []string, _timeSource chainlib.MedianTimeSource, - _targetOutboundPeers uint32, _maxInboundPeers uint32, - _limitOneInboundConnectionPerIP bool, _hyperSync bool, _syncType NodeSyncType, _stallTimeoutSeconds uint64, @@ -153,16 +135,13 @@ func NewConnectionManager( ValidateHyperSyncFlags(_hyperSync, _syncType) return &ConnectionManager{ - srv: _srv, - params: _params, - AddrMgr: _addrMgr, - listeners: _listeners, - connectIps: _connectIps, + srv: _srv, + params: _params, + listeners: _listeners, // We keep track of the last N nonces we've sent in order to detect // self connections. sentNonces: lru.NewCache(1000), timeSource: _timeSource, - //newestBlock: _newestBlock, // Initialize the peer data structures. @@ -180,25 +159,14 @@ func NewConnectionManager( donePeerChan: make(chan *Peer, 100), outboundConnectionChan: make(chan *outboundConnection, 100), - targetOutboundPeers: _targetOutboundPeers, - maxInboundPeers: _maxInboundPeers, - limitOneInboundConnectionPerIP: _limitOneInboundConnectionPerIP, - HyperSync: _hyperSync, - SyncType: _syncType, - serverMessageQueue: _serverMessageQueue, - stallTimeoutSeconds: _stallTimeoutSeconds, - minFeeRateNanosPerKB: _minFeeRateNanosPerKB, + HyperSync: _hyperSync, + SyncType: _syncType, + serverMessageQueue: _serverMessageQueue, + stallTimeoutSeconds: _stallTimeoutSeconds, + minFeeRateNanosPerKB: _minFeeRateNanosPerKB, } } -func (cmgr *ConnectionManager) GetAddrManager() *addrmgr.AddrManager { - return cmgr.AddrMgr -} - -func (cmgr *ConnectionManager) SetTargetOutboundPeers(numPeers uint32) { - cmgr.targetOutboundPeers = numPeers -} - // Check if the address passed shares a group with any addresses already in our // data structures. func (cmgr *ConnectionManager) IsFromRedundantOutboundIPAddress(na *wire.NetAddress) bool { @@ -236,40 +204,6 @@ func (cmgr *ConnectionManager) subFromGroupKey(na *wire.NetAddress) { cmgr.mtxOutboundConnIPGroups.Unlock() } -func (cmgr *ConnectionManager) getRandomAddr() *wire.NetAddress { - for tries := 0; tries < 100; tries++ { - addr := cmgr.AddrMgr.GetAddress() - if addr == nil { - glog.V(2).Infof("ConnectionManager.getRandomAddr: addr from GetAddressWithExclusions was nil") - break - } - - // Lock the address map since multiple threads will be trying to read - // and modify it at the same time. - cmgr.mtxConnectedOutboundAddrs.RLock() - ok := cmgr.connectedOutboundAddrs[addrmgr.NetAddressKey(addr.NetAddress())] - cmgr.mtxConnectedOutboundAddrs.RUnlock() - if ok { - glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing already connected address %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) - continue - } - - // We can only have one outbound address per /16. This is similar to - // Bitcoin and we do it to prevent Sybil attacks. - if cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { - glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) - continue - } - - glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning %v:%v at %d iterations", - addr.NetAddress().IP, addr.NetAddress().Port, tries) - return addr.NetAddress() - } - - glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") - return nil -} - func _delayRetry(retryCount uint64, persistentAddrForLogging *wire.NetAddress, unit time.Duration) (_retryDuration time.Duration) { // No delay if we haven't tried yet or if the number of retries isn't positive. if retryCount <= 0 { @@ -288,42 +222,6 @@ func _delayRetry(retryCount uint64, persistentAddrForLogging *wire.NetAddress, u return retryDelay } -func (cmgr *ConnectionManager) enoughOutboundPeers() bool { - val := atomic.LoadUint32(&cmgr.numOutboundPeers) - if val > cmgr.targetOutboundPeers { - glog.Errorf("enoughOutboundPeers: Connected to too many outbound "+ - "peers: (%d). Should be "+ - "no more than (%d).", val, cmgr.targetOutboundPeers) - return true - } - - if val == cmgr.targetOutboundPeers { - return true - } - return false -} - -func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) { - port := params.DefaultSocketPort - host, portstr, err := net.SplitHostPort(ipStr) - if err != nil { - // No port specified so leave port=default and set - // host to the ipStr. - host = ipStr - } else { - pp, err := strconv.ParseUint(portstr, 10, 16) - if err != nil { - return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) - } - port = uint16(pp) - } - netAddr, err := addrMgr.HostToNetAddress(host, port, 0) - if err != nil { - return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) - } - return netAddr, nil -} - func (cmgr *ConnectionManager) IsConnectedOutboundIpAddress(netAddr *wire.NetAddress) bool { // Lock the address map since multiple threads will be trying to read // and modify it at the same time. @@ -412,30 +310,14 @@ func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, na *wire.NetAddress, a return peer } -func (cmgr *ConnectionManager) _isFromRedundantInboundIPAddress(addrToCheck net.Addr) bool { +func (cmgr *ConnectionManager) IsFromRedundantInboundIPAddress(netAddr *wire.NetAddress) bool { cmgr.mtxPeerMaps.RLock() defer cmgr.mtxPeerMaps.RUnlock() // Loop through all the peers to see if any have the same IP // address. This map is normally pretty small so doing this // every time a Peer connects should be fine. - netAddr, err := IPToNetAddr(addrToCheck.String(), cmgr.AddrMgr, cmgr.params) - if err != nil { - // Return true in case we have an error. We do this because it - // will result in the peer connection not being accepted, which - // is desired in this case. - glog.Warningf(errors.Wrapf(err, - "ConnectionManager._isFromRedundantInboundIPAddress: Problem parsing "+ - "net.Addr to wire.NetAddress so marking as redundant and not "+ - "making connection").Error()) - return true - } - if netAddr == nil { - glog.Warningf("ConnectionManager._isFromRedundantInboundIPAddress: " + - "address was nil after parsing so marking as redundant and not " + - "making connection") - return true - } + // If the IP is a localhost IP let it slide. This is useful for testing fake // nodes on a local machine. // TODO: Should this be a flag? @@ -475,37 +357,6 @@ func (cmgr *ConnectionManager) _handleInboundConnections() { continue } - // As a quick check, reject the peer if we have too many already. Note that - // this check isn't perfect but we have a later check at the end after doing - // a version negotiation that will properly reject the peer if this check - // messes up e.g. due to a concurrency issue. - // - // TODO: We should instead have eviction logic here to prevent - // someone from monopolizing a node's inbound connections. - numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers) - if numInboundPeers > cmgr.maxInboundPeers { - - glog.Infof("Rejecting INBOUND peer (%s) due to max inbound peers (%d) hit.", - conn.RemoteAddr().String(), cmgr.maxInboundPeers) - conn.Close() - - continue - } - - // If we want to limit inbound connections to one per IP address, check to - // make sure this address isn't already connected. - if cmgr.limitOneInboundConnectionPerIP && - cmgr._isFromRedundantInboundIPAddress(conn.RemoteAddr()) { - - glog.Infof("Rejecting INBOUND peer (%s) due to already having an "+ - "inbound connection from the same IP with "+ - "limit_one_inbound_connection_per_ip set.", - conn.RemoteAddr().String()) - conn.Close() - - continue - } - cmgr.inboundConnectionChan <- &inboundConnection{ connection: conn, } @@ -661,28 +512,14 @@ func (cmgr *ConnectionManager) removePeer(pp *Peer) { // Update the last seen time before we finish removing the peer. // TODO: Really, we call 'Connected()' on removing a peer? // I can't find a Disconnected() but seems odd. - cmgr.AddrMgr.Connected(pp.netAddr) + // FIXME: Move this to Done Peer + //cmgr.AddrMgr.Connected(pp.netAddr) // Remove the peer from our data structure. delete(peerList, pp.ID) delete(cmgr.connectedPeers, pp.ID) } -func (cmgr *ConnectionManager) _maybeReplacePeer(pp *Peer) { - // If the peer was outbound, replace her with a - // new peer to maintain a fixed number of outbound connections. - if pp.isOutbound { - // If the peer is not persistent then we don't want to pass an - // address to connectPeer. The lack of an address will cause it - // to choose random addresses from the addrmgr until one works. - na := pp.netAddr - if !pp.isPersistent { - na = nil - } - cmgr._dialOutboundConnection(na, pp.isPersistent) - } -} - func (cmgr *ConnectionManager) _logOutboundPeerData() { numOutboundPeers := int(atomic.LoadUint32(&cmgr.numOutboundPeers)) numInboundPeers := int(atomic.LoadUint32(&cmgr.numInboundPeers)) @@ -791,68 +628,6 @@ func (cmgr *ConnectionManager) Start() { Connection: ic, }, } - case pp := <-cmgr.newPeerChan: - { - // We have successfully connected to a peer and it passed its version - // negotiation. - - // if this is a non-persistent outbound peer and we already have enough - // outbound peers, then don't bother adding this one. - if !pp.isPersistent && pp.isOutbound && cmgr.enoughOutboundPeers() { - // TODO: Make this less verbose - glog.V(1).Infof("Dropping peer because we already have enough outbound peer connections.") - pp.Conn.Close() - continue - } - - // If this is a non-persistent outbound peer and the group key - // overlaps with another peer we're already connected to then - // abort mission. We only connect to one peer per IP group in - // order to prevent Sybil attacks. - if pp.isOutbound && - !pp.isPersistent && - cmgr.IsFromRedundantOutboundIPAddress(pp.netAddr) { - - // TODO: Make this less verbose - glog.Infof("Rejecting OUTBOUND NON-PERSISTENT peer (%v) with "+ - "redundant group key (%s).", - pp, addrmgr.GroupKey(pp.netAddr)) - - pp.Conn.Close() - cmgr._maybeReplacePeer(pp) - continue - } - - // Check that we have not exceeded the maximum number of inbound - // peers allowed. - // - // TODO: We should instead have eviction logic to prevent - // someone from monopolizing a node's inbound connections. - numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers) - if !pp.isOutbound && numInboundPeers > cmgr.maxInboundPeers { - - // TODO: Make this less verbose - glog.Infof("Rejecting INBOUND peer (%v) due to max inbound peers (%d) hit.", - pp, cmgr.maxInboundPeers) - - pp.Conn.Close() - continue - } - - // Now we can add the peer to our data structures. - pp._logAddPeer() - cmgr.addPeer(pp) - - // Start the peer's message loop. - pp.Start() - - // Signal the server about the new Peer in case it wants to do something with it. - cmgr.serverMessageQueue <- &ServerMessage{ - Peer: pp, - Msg: &MsgDeSoNewPeer{}, - } - - } case pp := <-cmgr.donePeerChan: { // By the time we get here, it can be assumed that the Peer's Disconnect function @@ -866,7 +641,6 @@ func (cmgr *ConnectionManager) Start() { // Potentially replace the peer. For example, if the Peer was an outbound Peer // then we want to find a new peer in order to maintain our TargetOutboundPeers. - cmgr._maybeReplacePeer(pp) // Signal the server about the Peer being done in case it wants to do something // with it. diff --git a/lib/constants.go b/lib/constants.go index d6dbf1252..7a565bb6f 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -561,6 +561,9 @@ type DeSoParams struct { // The amount of time we wait to receive a version message from a peer. VersionNegotiationTimeout time.Duration + // The maximum number of addresses to broadcast to peers. + MaxAddressesToBroadcast uint32 + // The genesis block to use as the base of our chain. GenesisBlock *MsgDeSoBlock // The expected hash of the genesis block. Should align with what one @@ -1019,6 +1022,8 @@ var DeSoMainnetParams = DeSoParams{ DialTimeout: 30 * time.Second, VersionNegotiationTimeout: 30 * time.Second, + MaxAddressesToBroadcast: 10, + BlockRewardMaturity: time.Hour * 3, V1DifficultyAdjustmentFactor: 10, @@ -1288,6 +1293,8 @@ var DeSoTestnetParams = DeSoParams{ DialTimeout: 30 * time.Second, VersionNegotiationTimeout: 30 * time.Second, + MaxAddressesToBroadcast: 10, + GenesisBlock: &GenesisBlock, GenesisBlockHashHex: GenesisBlockHashHex, diff --git a/lib/peer.go b/lib/peer.go index 3bdc73fdc..9c0c33664 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -1212,11 +1212,12 @@ func (pp *Peer) Start() { // If the address manager needs more addresses, then send a GetAddr message // to the peer. This is best-effort. if pp.cmgr != nil { - if pp.cmgr.AddrMgr.NeedMoreAddresses() { + // TODO: Move this to ConnectionController. + /*if pp.cmgr.AddrMgr.NeedMoreAddresses() { go func() { pp.QueueMessage(&MsgDeSoGetAddr{}) }() - } + }*/ } // Send our verack message now that the IO processing machinery has started. diff --git a/lib/pos_connection_controller.go b/lib/pos_connection_controller.go new file mode 100644 index 000000000..c288775d7 --- /dev/null +++ b/lib/pos_connection_controller.go @@ -0,0 +1,723 @@ +package lib + +import ( + "fmt" + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" + "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/go-deadlock" + "github.com/golang/glog" + "github.com/pkg/errors" + "net" + "strconv" + "sync" + "time" +) + +type ConnectionController struct { + // The parameters we are initialized with. + params *DeSoParams + + server *Server + signer *BLSSigner + + handshake *HandshakeController + + rniManager *RemoteNodeIndexerManager + + validatorMapLock sync.Mutex + getActiveValidators func() map[bls.PublicKey]*ValidatorEntry + + // The address manager keeps track of peer addresses we're aware of. When + // we need to connect to a new outbound peer, it chooses one of the addresses + // it's aware of at random and provides it to us. + AddrMgr *addrmgr.AddrManager + + // addrsToBroadcast is a list of all the addresses we've received from valid addr + // messages that we intend to broadcast to our peers. It is organized as: + // -> . + // + // It is organized in this way so that we can limit the number of addresses we + // are distributing for a single peer to avoid a DOS attack. + addrsToBroadcastLock deadlock.RWMutex + addrsToBroadcast map[string][]*SingleAddr + + // When --connectips is set, we don't connect to anything from the addrmgr. + connectIps []string + + // The target number of outbound peers we want to have. + targetOutboundPeers uint32 + // The maximum number of inbound peers we allow. + maxInboundPeers uint32 + // When true, only one connection per IP is allowed. Prevents eclipse attacks + // among other things. + limitOneInboundConnectionPerIP bool + + startGroup sync.WaitGroup + exitChan chan struct{} + exitGroup sync.WaitGroup +} + +func NewConnectionController(params *DeSoParams, server *Server, rniManager *RemoteNodeIndexerManager, signer *BLSSigner, + addrMgr *addrmgr.AddrManager, targetOutboundPeers uint32, maxInboundPeers uint32, + limitOneInboundConnectionPerIP bool) *ConnectionController { + + return &ConnectionController{ + params: params, + server: server, + signer: signer, + rniManager: rniManager, + AddrMgr: addrMgr, + addrsToBroadcast: make(map[string][]*SingleAddr), + targetOutboundPeers: targetOutboundPeers, + maxInboundPeers: maxInboundPeers, + limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP, + exitChan: make(chan struct{}), + } +} + +func (cc *ConnectionController) Start() { + cc.startGroup.Add(3) + // Start the validator connector + go cc.startValidatorConnector() + + cc.startGroup.Wait() + cc.exitGroup.Add(3) +} + +func (cc *ConnectionController) Stop() { + close(cc.exitChan) + cc.exitGroup.Wait() +} + +func (cc *ConnectionController) initiatePersistentConnections() { + // This is a hack to make outbound connections go away. + if cc.targetOutboundPeers == 0 { + return + } + if len(cc.connectIps) > 0 { + // Connect to addresses passed via the --connect-ips flag. These addresses + // are persistent in the sense that if we disconnect from one, we will + // try to reconnect to the same one. + for _, connectIp := range cc.connectIps { + if err := cc.createPersistentOutboundConnection(connectIp); err != nil { + glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+ + "to connectIp %v: %v", connectIp, err) + } + } + } +} + +func (cc *ConnectionController) startValidatorConnector() { + cc.startGroup.Done() + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Minute): + cc.validatorMapLock.Lock() + activeValidatorsMap := cc.getActiveValidators() + cc.refreshValidatorIndex(activeValidatorsMap) + cc.refreshValidatorAttemptedIndex(activeValidatorsMap) + cc.connectValidators(activeValidatorsMap) + cc.validatorMapLock.Unlock() + } + } +} + +func (cc *ConnectionController) startPeerConnector() { + cc.startGroup.Done() + + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + // Only connect to addresses from the addrmgr if we don't specify --connect-ips. + // These addresses are *not* persistent, meaning if we disconnect from one we'll + // try a different one. + // TODO: Do we still want this? + if len(cc.connectIps) == 0 { + continue + } + + cc.refreshOutboundIndex() + cc.refreshInboundIndex() + cc.connectPeers() + } + } +} + +// Must be run inside a goroutine. Relays addresses to peers at regular intervals +// and relays our own address to peers once every 24 hours. +func (cc *ConnectionController) startAddressRelayer() { + cc.startGroup.Done() + numMinutesPassed := 0 + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(AddrRelayIntervalSeconds * time.Second): + // For the first ten minutes after the connection controller starts, relay our address to all + // peers. After the first ten minutes, do it once every 24 hours. + glog.V(1).Infof("ConnectionController.startAddressRelayer: Relaying our own addr to peers") + if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { + // TODO: Change to retrieve all RemoteNodes from the indexer. + for _, pp := range cc.server.GetAllPeers() { + bestAddress := cc.AddrMgr.GetBestLocalAddress(pp.netAddr) + if bestAddress != nil { + glog.V(2).Infof("ConnectionController.startAddressRelayer: Relaying address %v to "+ + "peer %v", bestAddress.IP.String(), pp) + if err := cc.server.SendMessage(&MsgDeSoAddr{ + AddrList: []*SingleAddr{ + { + Timestamp: time.Now(), + IP: bestAddress.IP, + Port: bestAddress.Port, + Services: (ServiceFlag)(bestAddress.Services), + }, + }, + }, pp.ID); err != nil { + glog.Errorf("ConnectionController.startAddressRelayer: Problem sending "+ + "MsgDeSoAddr to peer %v: %v", pp, err) + } + } + } + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Seeing if there are addrs to relay...") + // Broadcast the addrs we have to all of our peers. + addrsToBroadcast := cc.getAddrsToBroadcast() + if len(addrsToBroadcast) == 0 { + glog.V(2).Infof("ConnectionController.startAddressRelayer: No addrs to relay.") + time.Sleep(AddrRelayIntervalSeconds * time.Second) + continue + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Found %d addrs to "+ + "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) + // Iterate over all our peers and broadcast the addrs to all of them. + for _, pp := range cc.server.GetAllPeers() { + pp.AddDeSoMessage(&MsgDeSoAddr{ + AddrList: addrsToBroadcast, + }, false) + } + time.Sleep(AddrRelayIntervalSeconds * time.Second) + numMinutesPassed++ + } + } +} + +// ########################### +// ## Handlers (Peer, DeSoMessage) +// ########################### + +func (cc *ConnectionController) _handleDonePeerMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeDonePeer { + return + } + + cc.rniManager.RemovePeer(origin) +} + +func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeAddr { + return + } + + var msg *MsgDeSoAddr + var ok bool + if msg, ok = desoMsg.(*MsgDeSoAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rniManager.DisconnectPeer(origin) + return + } + + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + glog.V(1).Infof("ConnectionController._handleAddrMessage: Received Addr from peer %v with addrs %v", origin, spew.Sdump(msg.AddrList)) + + // If this addr message contains more than the maximum allowed number of addresses + // then disconnect this peer. + if len(msg.AddrList) > MaxAddrsPerAddrMsg { + glog.Errorf(fmt.Sprintf("ConnectionController._handleAddrMessage: Disconnecting "+ + "Peer %v for sending us an addr message with %d transactions, which exceeds "+ + "the max allowed %d", + origin, len(msg.AddrList), MaxAddrsPerAddrMsg)) + + cc.rniManager.DisconnectPeer(origin) + return + } + + // Add all the addresses we received to the addrmgr. + netAddrsReceived := []*wire.NetAddress{} + for _, addr := range msg.AddrList { + addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) + if !addrmgr.IsRoutable(addrAsNetAddr) { + glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, origin) + continue + } + + netAddrsReceived = append( + netAddrsReceived, addrAsNetAddr) + } + cc.AddrMgr.AddAddresses(netAddrsReceived, origin.netAddr) + + // If the message had <= 10 addrs in it, then queue all the addresses for relaying + // on the next cycle. + if len(msg.AddrList) <= 10 { + glog.V(1).Infof("ConnectionController._handleAddrMessage: Queueing %d addrs for forwarding from "+ + "peer %v", len(msg.AddrList), origin) + sourceAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: origin.netAddr.IP, + Port: origin.netAddr.Port, + Services: origin.serviceFlags, + } + listToAddTo, hasSeenSource := cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] + if !hasSeenSource { + listToAddTo = []*SingleAddr{} + } + // If this peer has been sending us a lot of little crap, evict a lot of their + // stuff but don't disconnect. + if len(listToAddTo) > MaxAddrsPerAddrMsg { + listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] + } + listToAddTo = append(listToAddTo, msg.AddrList...) + cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo + } +} + +func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeGetAddr { + return + } + + if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rniManager.DisconnectPeer(origin) + return + } + + glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", origin) + // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr + // and send them back to the peer. + netAddrsFound := cc.AddrMgr.AddressCache() + if len(netAddrsFound) > MaxAddrsPerAddrMsg { + netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] + } + + // Convert the list to a SingleAddr list. + res := &MsgDeSoAddr{} + for _, netAddr := range netAddrsFound { + singleAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: netAddr.IP, + Port: netAddr.Port, + Services: (ServiceFlag)(netAddr.Services), + } + res.AddrList = append(res.AddrList, singleAddr) + } + cc.rniManager.SendMessageToPeer(origin, res) +} + +func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeNewConnection { + return + } + + var msg *MsgDeSoNewConnection + var ok bool + if msg, ok = desoMsg.(*MsgDeSoNewConnection); !ok { + return + } + + switch msg.Connection.GetConnectionType() { + case ConnectionTypeInbound: + if err := cc.processInboundConnection(msg.Connection); err != nil { + glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling inbound connection: %v", err) + msg.Connection.Close() + return + } + case ConnectionTypeOutbound: + if err := cc.processOutboundConnection(msg.Connection); err != nil { + glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling outbound connection: %v", err) + var oc *outboundConnection + if oc, ok = msg.Connection.(*outboundConnection); !ok { + return + } + id := NewRemoteNodeAttemptedId(oc.attemptId) + cc.rniManager.RemoveNonValidatorAttempted(id) + cc.server.RemoveAttemptedOutboundAddrs(oc.address) + msg.Connection.Close() + return + } + } +} + +// ########################### +// ## Validator Connections +// ########################### + +func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bls.PublicKey]*ValidatorEntry) { + // De-index inactive validators. + validatorRemoteNodeMap := cc.rniManager.GetRemoteNodeIndexer().GetValidatorIndex().GetIndex() + for pk, rn := range validatorRemoteNodeMap { + if _, ok := activeValidatorsMap[pk]; !ok { + cc.rniManager.UnsetValidator(pk, rn) + } + } + + // Look for validators in our existing outbound / inbound connections. + allNonValidators := cc.rniManager.GetAllNonValidators() + for _, rn := range allNonValidators { + meta := rn.GetHandshakeMetadata() + if meta == nil { + continue + } + pk := meta.GetValidatorPublicKey() + if _, ok := activeValidatorsMap[pk]; ok { + cc.rniManager.SetValidator(pk, rn) + } + } +} + +func (cc *ConnectionController) refreshValidatorAttemptedIndex(activeValidatorsMap map[bls.PublicKey]*ValidatorEntry) { + // Disconnect inactive attempted validators. + validatorRemoteNodeMap := cc.rniManager.GetRemoteNodeIndexer().GetValidatorAttemptedIndex().GetIndex() + for pk, rn := range validatorRemoteNodeMap { + if _, ok := activeValidatorsMap[pk]; !ok { + cc.rniManager.Disconnect(rn) + } + } +} + +func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.PublicKey]*ValidatorEntry) { + for pk, validator := range activeValidatorsMap { + _, connected := cc.rniManager.GetRemoteNodeIndexer().GetValidatorIndex().Get(pk) + _, attempted := cc.rniManager.GetRemoteNodeIndexer().GetValidatorAttemptedIndex().Get(pk) + if !connected && !attempted { + // FIXME: for now we'll only use the first address in the ValidatorEntry + address := string(validator.Domains[0]) + if err := cc.createValidatorConnection(address, pk); err != nil { + // TODO: Do we want to log an error here? + continue + } + } + } +} + +// ########################### +// ## Peer Connections +// ########################### + +func (cc *ConnectionController) connectPeers() { + numConnectedOutboundPeers := cc.rniManager.GetNumConnectedOutboundPeers() + numAttemptedPeers := cc.rniManager.GetNumAttemptedNonValidators() + + remainingOutboundPeers := uint32(0) + if numConnectedOutboundPeers+numAttemptedPeers < cc.targetOutboundPeers { + remainingOutboundPeers = cc.targetOutboundPeers - (numConnectedOutboundPeers + numAttemptedPeers) + } + for ii := uint32(0); ii < remainingOutboundPeers; ii++ { + addr := cc.getRandomUnconnectedAddress() + cc.AddrMgr.Attempt(addr) + cc.rniManager.CreateOutboundConnectionNetAddress(addr) + } +} + +func (cc *ConnectionController) refreshOutboundIndex() { + numConnectedOutboundPeers := cc.rniManager.GetNumConnectedOutboundPeers() + numAttemptedPeers := cc.rniManager.GetNumAttemptedNonValidators() + + excessiveOutboundPeers := uint32(0) + if numConnectedOutboundPeers+numAttemptedPeers > cc.targetOutboundPeers { + excessiveOutboundPeers = numConnectedOutboundPeers + numAttemptedPeers - cc.targetOutboundPeers + } + // Disconnect random outbound peers if we have too many peers. + for ii := uint32(0); ii < excessiveOutboundPeers; ii++ { + rn, ok := cc.rniManager.GetRemoteNodeIndexer().GetNonValidatorOutboundIndex().GetRandom() + if !ok { + break + } + cc.rniManager.Disconnect(rn) + } +} + +func (cc *ConnectionController) refreshInboundIndex() { + numConnectedInboundPeers := cc.rniManager.GetNumConnectedInboundPeers() + + excessiveInboundPeers := uint32(0) + if numConnectedInboundPeers > cc.maxInboundPeers { + excessiveInboundPeers = numConnectedInboundPeers - cc.maxInboundPeers + } + // Disconnect random inbound peers if we have too many peers. + for ii := uint32(0); ii < excessiveInboundPeers; ii++ { + rn, ok := cc.rniManager.GetRemoteNodeIndexer().GetNonValidatorInboundIndex().GetRandom() + if !ok { + break + } + cc.rniManager.Disconnect(rn) + } +} + +func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress { + for tries := 0; tries < 100; tries++ { + addr := cc.AddrMgr.GetAddress() + if addr == nil { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: addr from GetAddressWithExclusions was nil") + break + } + + if cc.server.IsConnectedOutboundIpAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundancy %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + if cc.server.IsAttemptedOutboundIpAddress(addr.NetAddress()) { + continue + } + + // We can only have one outbound address per /16. This is similar to + // Bitcoin and we do it to prevent Sybil attacks. + if cc.server.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + return addr.NetAddress() + } + + //glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") + return nil +} + +func (cc *ConnectionController) SetTargetOutboundPeers(numPeers uint32) { + cc.targetOutboundPeers = numPeers +} + +func (cc *ConnectionController) enoughInboundPeers() bool { + return cc.rniManager.GetNumConnectedInboundPeers() >= cc.maxInboundPeers +} + +func (cc *ConnectionController) enoughOutboundPeers() bool { + return cc.rniManager.GetNumConnectedOutboundPeers() >= cc.targetOutboundPeers +} + +func (cc *ConnectionController) processInboundConnection(conn Connection) error { + var ic *inboundConnection + var ok bool + if ic, ok = conn.(*inboundConnection); !ok { + return fmt.Errorf("ConnectionController.handleInboundConnection: Connection is not an inboundConnection") + } + + // As a quick check, reject the peer if we have too many already. Note that + // this check isn't perfect but we have a later check at the end after doing + // a version negotiation that will properly reject the peer if this check + // messes up e.g. due to a concurrency issue. + // + // TODO: We should instead have eviction logic here to prevent + // someone from monopolizing a node's inbound connections. + if cc.enoughInboundPeers() { + return fmt.Errorf("ConnectionController.handleInboundConnection: Rejecting INBOUND peer (%s) due to max "+ + "inbound peers (%d) hit", ic.connection.RemoteAddr().String(), cc.maxInboundPeers) + } + + // If we want to limit inbound connections to one per IP address, check to + // make sure this address isn't already connected. + if cc.limitOneInboundConnectionPerIP && + cc.isFromRedundantInboundIPAddress(ic.connection.RemoteAddr()) { + + return fmt.Errorf("ConnectionController.handleInboundConnection: Rejecting INBOUND peer (%s) due to already having an "+ + "inbound connection from the same IP with limit_one_inbound_connection_per_ip set", + ic.connection.RemoteAddr().String()) + } + + na, err := cc.ConvertIPStringToNetAddress(ic.connection.RemoteAddr().String()) + if err != nil { + return errors.Wrapf(err, "ConnectionController.handleInboundConnection: Problem calling ipToNetAddr "+ + "for addr: (%s)", ic.connection.RemoteAddr().String()) + } + + remoteNode := NewRemoteNode() + if err := remoteNode.ConnectInboundPeer(ic.connection, na); err != nil { + return errors.Wrapf(err, "ConnectionController.handleInboundConnection: Problem calling ConnectInboundPeer "+ + "for addr: (%s)", ic.connection.RemoteAddr().String()) + } + cc.rniManager.AddRemoteNode(remoteNode) + cc.rniManager.SetNonValidatorInbound(remoteNode) + + return nil +} + +func (cc *ConnectionController) processOutboundConnection(conn Connection) error { + var oc *outboundConnection + var ok bool + if oc, ok = conn.(*outboundConnection); !ok { + return fmt.Errorf("ConnectionController.handleOutboundConnection: Connection is not an outboundConnection") + } + + if oc.failed { + return fmt.Errorf("ConnectionController.handleOutboundConnection: Failed to connect to peer (%s)", oc.address.IP.String()) + } + + if !oc.isPersistent { + cc.AddrMgr.Connected(oc.address) + cc.AddrMgr.Good(oc.address) + } + + // if this is a non-persistent outbound peer and we already have enough + // outbound peers, then don't bother adding this one. + if !oc.isPersistent && cc.enoughOutboundPeers() { + return fmt.Errorf("ConnectionController.handleOutboundConnection: Connected to maximum number of outbound "+ + "peers (%d)", cc.targetOutboundPeers) + } + + // If this is a non-persistent outbound peer and the group key + // overlaps with another peer we're already connected to then + // abort mission. We only connect to one peer per IP group in + // order to prevent Sybil attacks. + if !oc.isPersistent && cc.server.IsFromRedundantOutboundIPAddress(oc.address) { + + // TODO: Make this less verbose + return fmt.Errorf("ConnectionController.handleOutboundConnection: Rejecting OUTBOUND NON-PERSISTENT connection with "+ + "redundant group key (%s).", addrmgr.GroupKey(oc.address)) + } + + na, err := cc.ConvertIPStringToNetAddress(oc.connection.RemoteAddr().String()) + if err != nil { + return errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling ipToNetAddr "+ + "for addr: (%s)", oc.connection.RemoteAddr().String()) + } + + remoteNode := NewRemoteNode() + if err := remoteNode.ConnectOutboundPeer(oc.connection, na, 0, false, false); err != nil { + return errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling ConnectOutboundPeer "+ + "for addr: (%s)", oc.connection.RemoteAddr().String()) + } + cc.rniManager.AddRemoteNode(remoteNode) + cc.rniManager.SetNonValidatorOutbound(remoteNode) + + return nil +} + +func (cc *ConnectionController) createValidatorConnection(ipStr string, pk bls.PublicKey) (_err error) { + netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) + if err != nil { + return err + } + cc.rniManager.CreateValidatorConnection(netAddr, pk) + return nil +} + +func (cc *ConnectionController) createPersistentOutboundConnection(ipStr string) (_err error) { + netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) + if err != nil { + return err + } + cc.rniManager.CreatePersistentOutboundConnectionNetAddress(netAddr) + return nil +} + +func (cc *ConnectionController) ConvertIPStringToNetAddress(ipStr string) (*wire.NetAddress, error) { + netAddr, err := IPToNetAddr(ipStr, cc.AddrMgr, cc.params) + if err != nil { + return nil, errors.Wrapf(err, + "ConnectionController.ConvertIPStringToNetAddress: Problem parsing "+ + "ipString to wire.NetAddress") + } + if netAddr == nil { + return nil, fmt.Errorf("ConnectionController.ConvertIPStringToNetAddress: " + + "address was nil after parsing") + } + return netAddr, nil +} + +func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) { + port := params.DefaultSocketPort + host, portstr, err := net.SplitHostPort(ipStr) + if err != nil { + // No port specified so leave port=default and set + // host to the ipStr. + host = ipStr + } else { + pp, err := strconv.ParseUint(portstr, 10, 16) + if err != nil { + return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) + } + port = uint16(pp) + } + netAddr, err := addrMgr.HostToNetAddress(host, port, 0) + if err != nil { + return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) + } + return netAddr, nil +} + +func (cc *ConnectionController) isFromRedundantInboundIPAddress(addr net.Addr) bool { + netAddr, err := IPToNetAddr(addr.String(), cc.AddrMgr, cc.params) + if err != nil { + // Return true in case we have an error. We do this because it + // will result in the peer connection not being accepted, which + // is desired in this case. + glog.Warningf(errors.Wrapf(err, + "ConnectionController._isFromRedundantInboundIPAddress: Problem parsing "+ + "net.Addr to wire.NetAddress so marking as redundant and not "+ + "making connection").Error()) + return true + } + if netAddr == nil { + glog.Warningf("ConnectionController._isFromRedundantInboundIPAddress: " + + "address was nil after parsing so marking as redundant and not " + + "making connection") + return true + } + + return cc.server.IsFromRedundantInboundIPAddress(netAddr) +} + +func (cc *ConnectionController) getAddrsToBroadcast() []*SingleAddr { + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + // If there's nothing in the map, return. + if len(cc.addrsToBroadcast) == 0 { + return []*SingleAddr{} + } + + // If we get here then we have some addresses to broadcast. + addrsToBroadcast := []*SingleAddr{} + for uint32(len(addrsToBroadcast)) < cc.params.MaxAddressesToBroadcast && + len(cc.addrsToBroadcast) > 0 { + // Choose a key at random. This works because map iteration is random in golang. + bucket := "" + for kk := range cc.addrsToBroadcast { + bucket = kk + break + } + + // Remove the last element from the slice for the given bucket. + currentAddrList := cc.addrsToBroadcast[bucket] + if len(currentAddrList) > 0 { + lastIndex := len(currentAddrList) - 1 + currentAddr := currentAddrList[lastIndex] + currentAddrList = currentAddrList[:lastIndex] + if len(currentAddrList) == 0 { + delete(cc.addrsToBroadcast, bucket) + } else { + cc.addrsToBroadcast[bucket] = currentAddrList + } + + addrsToBroadcast = append(addrsToBroadcast, currentAddr) + } + } + + return addrsToBroadcast +} diff --git a/lib/server.go b/lib/server.go index 75b8ec6fa..97bd7db88 100644 --- a/lib/server.go +++ b/lib/server.go @@ -62,7 +62,9 @@ type Server struct { eventManager *EventManager TxIndex *TXIndex + handshakeController *HandshakeController // fastHotStuffEventLoop consensus.FastHotStuffEventLoop + connectionController *ConnectionController // posMempool *PosMemPool TODO: Add the mempool later // All messages received from peers get sent from the ConnectionManager to the @@ -120,15 +122,6 @@ type Server struct { // requested data but have not yet received a response. requestedTransactionsMap map[BlockHash]*GetDataRequestInfo - // addrsToBroadcast is a list of all the addresses we've received from valid addr - // messages that we intend to broadcast to our peers. It is organized as: - // -> . - // - // It is organized in this way so that we can limit the number of addresses we - // are distributing for a single peer to avoid a DOS attack. - addrsToBroadcastLock deadlock.RWMutex - addrsToBroadcastt map[string][]*SingleAddr - // When set to true, we disable the ConnectionManager DisableNetworking bool @@ -445,8 +438,7 @@ func NewServer( // Create a new connection manager but note that it won't be initialized until Start(). _incomingMessages := make(chan *ServerMessage, (_targetOutboundPeers+_maxInboundPeers)*3) _cmgr := NewConnectionManager( - _params, _desoAddrMgr, _listeners, _connectIps, timesource, - _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP, + _params, _listeners, _connectIps, timesource, _hyperSync, _syncType, _stallTimeoutSeconds, _minFeeRateNanosPerKB, _incomingMessages, srv) @@ -481,6 +473,12 @@ func NewServer( hex.EncodeToString(_chain.blockTip().Hash[:]), hex.EncodeToString(BigintToHash(_chain.blockTip().CumWork)[:])) + rniManager := NewRemoteNodeIndexerManager() + + srv.connectionController = NewConnectionController(_params, srv, rniManager, _blsKeystore.GetSigner(), _desoAddrMgr, + _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) + srv.handshakeController = NewHandshakeController(rniManager) + if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) } @@ -569,9 +567,6 @@ func NewServer( srv.StartStatsdReporter() } - // Initialize the addrs to broadcast map. - srv.addrsToBroadcastt = make(map[string][]*SingleAddr) - // This will initialize the request queues. srv.ResetRequestQueues() @@ -2149,94 +2144,19 @@ func (srv *Server) StartStatsdReporter() { }() } -func (srv *Server) _handleAddrMessage(pp *Peer, msg *MsgDeSoAddr) { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - glog.V(1).Infof("Server._handleAddrMessage: Received Addr from peer %v with addrs %v", pp, spew.Sdump(msg.AddrList)) - - // If this addr message contains more than the maximum allowed number of addresses - // then disconnect this peer. - if len(msg.AddrList) > MaxAddrsPerAddrMsg { - glog.Errorf(fmt.Sprintf("Server._handleAddrMessage: Disconnecting "+ - "Peer %v for sending us an addr message with %d transactions, which exceeds "+ - "the max allowed %d", - pp, len(msg.AddrList), MaxAddrsPerAddrMsg)) - pp.Disconnect() - return - } - - // Add all the addresses we received to the addrmgr. - netAddrsReceived := []*wire.NetAddress{} - for _, addr := range msg.AddrList { - addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) - if !addrmgr.IsRoutable(addrAsNetAddr) { - glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, pp) - continue - } - - netAddrsReceived = append( - netAddrsReceived, addrAsNetAddr) - } - srv.cmgr.AddrMgr.AddAddresses(netAddrsReceived, pp.netAddr) - - // If the message had <= 10 addrs in it, then queue all the addresses for relaying - // on the next cycle. - if len(msg.AddrList) <= 10 { - glog.V(1).Infof("Server._handleAddrMessage: Queueing %d addrs for forwarding from "+ - "peer %v", len(msg.AddrList), pp) - sourceAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: pp.netAddr.IP, - Port: pp.netAddr.Port, - Services: pp.serviceFlags, - } - listToAddTo, hasSeenSource := srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] - if !hasSeenSource { - listToAddTo = []*SingleAddr{} - } - // If this peer has been sending us a lot of little crap, evict a lot of their - // stuff but don't disconnect. - if len(listToAddTo) > MaxAddrsPerAddrMsg { - listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] - } - listToAddTo = append(listToAddTo, msg.AddrList...) - srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo - } -} - -func (srv *Server) _handleGetAddrMessage(pp *Peer, msg *MsgDeSoGetAddr) { - glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", pp) - // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr - // and send them back to the peer. - netAddrsFound := srv.cmgr.AddrMgr.AddressCache() - if len(netAddrsFound) > MaxAddrsPerAddrMsg { - netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] - } - - // Convert the list to a SingleAddr list. - res := &MsgDeSoAddr{} - for _, netAddr := range netAddrsFound { - singleAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: netAddr.IP, - Port: netAddr.Port, - Services: (ServiceFlag)(netAddr.Services), - } - res.AddrList = append(res.AddrList, singleAddr) - } - pp.AddDeSoMessage(res, false) -} - func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_shouldQuit bool) { switch serverMessage.Msg.(type) { // Control messages used internally to signal to the server. case *MsgDeSoNewPeer: break case *MsgDeSoPeerHandshakeComplete: - srv._handlePeerHandshakeComplete(serverMessage.Peer) + srv._handleHandshakePeer(serverMessage.Peer) + srv.handshakeController._handleHandshakePeerMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoDonePeer: srv._handleDonePeer(serverMessage.Peer) + srv.connectionController._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoNewConnection: + srv.connectionController._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoQuit: return true } @@ -2248,6 +2168,10 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) { // Handle all non-control message types from our Peers. switch msg := serverMessage.Msg.(type) { // Messages sent among peers. + case *MsgDeSoAddr: + srv.connectionController._handleAddrMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoGetAddr: + srv.connectionController._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoGetHeaders: srv._handleGetHeaders(serverMessage.Peer, msg) case *MsgDeSoHeaderBundle: @@ -2270,6 +2194,10 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) { srv._handleMempool(serverMessage.Peer, msg) case *MsgDeSoInv: srv._handleInv(serverMessage.Peer, msg) + case *MsgDeSoVersion: + srv.handshakeController._handleVersionMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoVerack: + srv.handshakeController._handleVerackMessage(serverMessage.Peer, serverMessage.Msg) } } @@ -2366,20 +2294,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Infof("Server._startConsensus: Handling message of type %v from Peer %v", serverMessage.Msg.GetMsgType(), serverMessage.Peer) - - // If the message is an addr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeAddr { - srv._handleAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoAddr)) - continue - } - // If the message is a GetAddr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeGetAddr { - srv._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoGetAddr)) - continue - } - srv._handlePeerMessages(serverMessage) // Always check for and handle control messages regardless of whether the @@ -2400,96 +2314,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Info("Server.Start: Server done") } -func (srv *Server) _getAddrsToBroadcast() []*SingleAddr { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - // If there's nothing in the map, return. - if len(srv.addrsToBroadcastt) == 0 { - return []*SingleAddr{} - } - - // If we get here then we have some addresses to broadcast. - addrsToBroadcast := []*SingleAddr{} - for len(addrsToBroadcast) < 10 && len(srv.addrsToBroadcastt) > 0 { - // Choose a key at random. This works because map iteration is random in golang. - bucket := "" - for kk := range srv.addrsToBroadcastt { - bucket = kk - break - } - - // Remove the last element from the slice for the given bucket. - currentAddrList := srv.addrsToBroadcastt[bucket] - if len(currentAddrList) > 0 { - lastIndex := len(currentAddrList) - 1 - currentAddr := currentAddrList[lastIndex] - currentAddrList = currentAddrList[:lastIndex] - if len(currentAddrList) == 0 { - delete(srv.addrsToBroadcastt, bucket) - } else { - srv.addrsToBroadcastt[bucket] = currentAddrList - } - - addrsToBroadcast = append(addrsToBroadcast, currentAddr) - } - } - - return addrsToBroadcast -} - -// Must be run inside a goroutine. Relays addresses to peers at regular intervals -// and relays our own address to peers once every 24 hours. -func (srv *Server) _startAddressRelayer() { - for numMinutesPassed := 0; ; numMinutesPassed++ { - if atomic.LoadInt32(&srv.shutdown) >= 1 { - break - } - // For the first ten minutes after the server starts, relay our address to all - // peers. After the first ten minutes, do it once every 24 hours. - glog.V(1).Infof("Server.Start._startAddressRelayer: Relaying our own addr to peers") - if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { - for _, pp := range srv.cmgr.GetAllPeers() { - bestAddress := srv.cmgr.AddrMgr.GetBestLocalAddress(pp.netAddr) - if bestAddress != nil { - glog.V(2).Infof("Server.Start._startAddressRelayer: Relaying address %v to "+ - "peer %v", bestAddress.IP.String(), pp) - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: []*SingleAddr{ - { - Timestamp: time.Now(), - IP: bestAddress.IP, - Port: bestAddress.Port, - Services: (ServiceFlag)(bestAddress.Services), - }, - }, - }, false) - } - } - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Seeing if there are addrs to relay...") - // Broadcast the addrs we have to all of our peers. - addrsToBroadcast := srv._getAddrsToBroadcast() - if len(addrsToBroadcast) == 0 { - glog.V(2).Infof("Server.Start._startAddressRelayer: No addrs to relay.") - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Found %d addrs to "+ - "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) - // Iterate over all our peers and broadcast the addrs to all of them. - for _, pp := range srv.cmgr.GetAllPeers() { - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: addrsToBroadcast, - }, false) - } - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } -} - func (srv *Server) _startTransactionRelayer() { // If we've set a maximum sync height, we will not relay transactions. if srv.blockchain.MaxSyncBlockHeight > 0 { @@ -2582,8 +2406,6 @@ func (srv *Server) Start() { go srv._startConsensus() - go srv._startAddressRelayer() - go srv._startTransactionRelayer() // Once the ConnectionManager is started, peers will be found and connected to and