From 922ae567f23144b7b11fd007d5a160cdb3b14471 Mon Sep 17 00:00:00 2001 From: Piotr Nojszewski <29924594+AeonSw4n@users.noreply.github.com> Date: Thu, 11 Jan 2024 13:38:47 -0800 Subject: [PATCH] Revert "Another split" This reverts commit eaeec5875a84621b4888fc2a6104e9904e7ef53d. --- lib/connection_controller.go | 211 ++++++++++++++++++++++++++++++++++- lib/server.go | 204 --------------------------------- 2 files changed, 209 insertions(+), 206 deletions(-) diff --git a/lib/connection_controller.go b/lib/connection_controller.go index ae3a687cc..3b67bc995 100644 --- a/lib/connection_controller.go +++ b/lib/connection_controller.go @@ -4,9 +4,11 @@ import ( "fmt" "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" "github.com/deso-protocol/core/bls" "github.com/deso-protocol/core/collections" "github.com/deso-protocol/core/consensus" + "github.com/deso-protocol/go-deadlock" "github.com/golang/glog" "github.com/pkg/errors" "net" @@ -36,6 +38,15 @@ type ConnectionController struct { // it's aware of at random and provides it to us. AddrMgr *addrmgr.AddrManager + // addrsToBroadcast is a list of all the addresses we've received from valid addr + // messages that we intend to broadcast to our peers. It is organized as: + // -> . + // + // It is organized in this way so that we can limit the number of addresses we + // are distributing for a single peer to avoid a DOS attack. + addrsToBroadcastLock deadlock.RWMutex + addrsToBroadcast map[string][]*SingleAddr + // When --connect-ips is set, we don't connect to anything from the addrmgr. connectIps []string // persistentIpToRemoteNodeIdsMap maps persistent IP addresses, like the --connect-ips, to the RemoteNodeIds of the @@ -76,6 +87,7 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh rnManager: rnManager, AddrMgr: addrMgr, connectIps: connectIps, + addrsToBroadcast: make(map[string][]*SingleAddr), persistentIpToRemoteNodeIdsMap: collections.NewConcurrentMap[string, RemoteNodeId](), validatorMap: collections.NewConcurrentMap[bls.SerializedPublicKey, consensus.Validator](), targetNonValidatorOutboundRemoteNodes: targetNonValidatorOutboundRemoteNodes, @@ -165,6 +177,67 @@ func (cc *ConnectionController) startNonValidatorConnector() { } } +// Must be run inside a goroutine. Relays addresses to peers at regular intervals +// and relays our own address to peers once every 24 hours. +func (cc *ConnectionController) startAddressRelayer() { + cc.startGroup.Done() + numMinutesPassed := 0 + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(AddrRelayIntervalSeconds * time.Second): + // For the first ten minutes after the connection controller starts, relay our address to all + // peers. After the first ten minutes, do it once every 24 hours. + glog.V(1).Infof("ConnectionController.startAddressRelayer: Relaying our own addr to peers") + if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { + // TODO: Change to retrieve all RemoteNodes from the indexer. + for _, pp := range cc.cmgr.GetAllPeers() { + bestAddress := cc.AddrMgr.GetBestLocalAddress(pp.netAddr) + if bestAddress != nil { + glog.V(2).Infof("ConnectionController.startAddressRelayer: Relaying address %v to "+ + "peer %v", bestAddress.IP.String(), pp) + if err := cc.cmgr.SendMessage(&MsgDeSoAddr{ + AddrList: []*SingleAddr{ + { + Timestamp: time.Now(), + IP: bestAddress.IP, + Port: bestAddress.Port, + Services: (ServiceFlag)(bestAddress.Services), + }, + }, + }, pp.ID); err != nil { + glog.Errorf("ConnectionController.startAddressRelayer: Problem sending "+ + "MsgDeSoAddr to peer %v: %v", pp, err) + } + } + } + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Seeing if there are addrs to relay...") + // Broadcast the addrs we have to all of our peers. + addrsToBroadcast := cc.getAddrsToBroadcast() + if len(addrsToBroadcast) == 0 { + glog.V(2).Infof("ConnectionController.startAddressRelayer: No addrs to relay.") + time.Sleep(AddrRelayIntervalSeconds * time.Second) + continue + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Found %d addrs to "+ + "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) + // Iterate over all our peers and broadcast the addrs to all of them. + for _, pp := range cc.cmgr.GetAllPeers() { + pp.AddDeSoMessage(&MsgDeSoAddr{ + AddrList: addrsToBroadcast, + }, false) + } + time.Sleep(AddrRelayIntervalSeconds * time.Second) + numMinutesPassed++ + } + } +} + func (cc *ConnectionController) startRemoteNodeCleanup() { cc.startGroup.Done() @@ -206,7 +279,70 @@ func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMes return } - // TODO + id := NewRemoteNodeId(origin.ID) + var msg *MsgDeSoAddr + var ok bool + if msg, ok = desoMsg.(*MsgDeSoAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rnManager.DisconnectById(id) + return + } + + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + glog.V(1).Infof("ConnectionController._handleAddrMessage: Received Addr from peer %v with addrs %v", origin, spew.Sdump(msg.AddrList)) + + // If this addr message contains more than the maximum allowed number of addresses + // then disconnect this peer. + if len(msg.AddrList) > MaxAddrsPerAddrMsg { + glog.Errorf(fmt.Sprintf("ConnectionController._handleAddrMessage: Disconnecting "+ + "Peer %v for sending us an addr message with %d transactions, which exceeds "+ + "the max allowed %d", + origin, len(msg.AddrList), MaxAddrsPerAddrMsg)) + + cc.rnManager.DisconnectById(id) + return + } + + // Add all the addresses we received to the addrmgr. + netAddrsReceived := []*wire.NetAddress{} + for _, addr := range msg.AddrList { + addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) + if !addrmgr.IsRoutable(addrAsNetAddr) { + glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, origin) + continue + } + + netAddrsReceived = append( + netAddrsReceived, addrAsNetAddr) + } + cc.AddrMgr.AddAddresses(netAddrsReceived, origin.netAddr) + + // If the message had <= 10 addrs in it, then queue all the addresses for relaying + // on the next cycle. + if len(msg.AddrList) <= 10 { + glog.V(1).Infof("ConnectionController._handleAddrMessage: Queueing %d addrs for forwarding from "+ + "peer %v", len(msg.AddrList), origin) + sourceAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: origin.netAddr.IP, + Port: origin.netAddr.Port, + Services: origin.serviceFlags, + } + listToAddTo, hasSeenSource := cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] + if !hasSeenSource { + listToAddTo = []*SingleAddr{} + } + // If this peer has been sending us a lot of little crap, evict a lot of their + // stuff but don't disconnect. + if len(listToAddTo) > MaxAddrsPerAddrMsg { + listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] + } + listToAddTo = append(listToAddTo, msg.AddrList...) + cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo + } } func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) { @@ -214,7 +350,39 @@ func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSo return } - // TODO + id := NewRemoteNodeId(origin.ID) + if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rnManager.DisconnectById(id) + return + } + + glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", origin) + // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr + // and send them back to the peer. + netAddrsFound := cc.AddrMgr.AddressCache() + if len(netAddrsFound) > MaxAddrsPerAddrMsg { + netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] + } + + // Convert the list to a SingleAddr list. + res := &MsgDeSoAddr{} + for _, netAddr := range netAddrsFound { + singleAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: netAddr.IP, + Port: netAddr.Port, + Services: (ServiceFlag)(netAddr.Services), + } + res.AddrList = append(res.AddrList, singleAddr) + } + rn := cc.rnManager.GetRemoteNodeById(id) + if err := cc.rnManager.SendMessage(rn, res); err != nil { + glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", origin, err) + cc.rnManager.DisconnectById(id) + return + } } // _handleNewConnectionMessage is called when a new outbound or inbound connection is established. It is responsible @@ -733,3 +901,42 @@ func (cc *ConnectionController) isDuplicateInboundIPAddress(addr net.Addr) bool return cc.cmgr.IsDuplicateInboundIPAddress(netAddr) } + +func (cc *ConnectionController) getAddrsToBroadcast() []*SingleAddr { + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + // If there's nothing in the map, return. + if len(cc.addrsToBroadcast) == 0 { + return []*SingleAddr{} + } + + // If we get here then we have some addresses to broadcast. + addrsToBroadcast := []*SingleAddr{} + for uint32(len(addrsToBroadcast)) < cc.params.MaxAddressesToBroadcast && + len(cc.addrsToBroadcast) > 0 { + // Choose a key at random. This works because map iteration is random in golang. + bucket := "" + for kk := range cc.addrsToBroadcast { + bucket = kk + break + } + + // Remove the last element from the slice for the given bucket. + currentAddrList := cc.addrsToBroadcast[bucket] + if len(currentAddrList) > 0 { + lastIndex := len(currentAddrList) - 1 + currentAddr := currentAddrList[lastIndex] + currentAddrList = currentAddrList[:lastIndex] + if len(currentAddrList) == 0 { + delete(cc.addrsToBroadcast, bucket) + } else { + cc.addrsToBroadcast[bucket] = currentAddrList + } + + addrsToBroadcast = append(addrsToBroadcast, currentAddr) + } + } + + return addrsToBroadcast +} diff --git a/lib/server.go b/lib/server.go index 48850599e..17ce65e36 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "fmt" - "github.com/btcsuite/btcd/wire" "github.com/deso-protocol/core/consensus" "net" "reflect" @@ -122,15 +121,6 @@ type Server struct { // requested data but have not yet received a response. requestedTransactionsMap map[BlockHash]*GetDataRequestInfo - // addrsToBroadcast is a list of all the addresses we've received from valid addr - // messages that we intend to broadcast to our peers. It is organized as: - // -> . - // - // It is organized in this way so that we can limit the number of addresses we - // are distributing for a single peer to avoid a DOS attack. - addrsToBroadcastLock deadlock.RWMutex - addrsToBroadcastt map[string][]*SingleAddr - // When set to true, we disable the ConnectionManager DisableNetworking bool @@ -588,9 +578,6 @@ func NewServer( srv.StartStatsdReporter() } - // Initialize the addrs to broadcast map. - srv.addrsToBroadcastt = make(map[string][]*SingleAddr) - // This will initialize the request queues. srv.ResetRequestQueues() @@ -2168,89 +2155,6 @@ func (srv *Server) StartStatsdReporter() { }() } -func (srv *Server) _handleAddrMessage(pp *Peer, msg *MsgDeSoAddr) { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - glog.V(1).Infof("Server._handleAddrMessage: Received Addr from peer %v with addrs %v", pp, spew.Sdump(msg.AddrList)) - - // If this addr message contains more than the maximum allowed number of addresses - // then disconnect this peer. - if len(msg.AddrList) > MaxAddrsPerAddrMsg { - glog.Errorf(fmt.Sprintf("Server._handleAddrMessage: Disconnecting "+ - "Peer %v for sending us an addr message with %d transactions, which exceeds "+ - "the max allowed %d", - pp, len(msg.AddrList), MaxAddrsPerAddrMsg)) - pp.Disconnect() - return - } - - // Add all the addresses we received to the addrmgr. - netAddrsReceived := []*wire.NetAddress{} - for _, addr := range msg.AddrList { - addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) - if !addrmgr.IsRoutable(addrAsNetAddr) { - glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, pp) - continue - } - - netAddrsReceived = append( - netAddrsReceived, addrAsNetAddr) - } - // TODO: temporary - addressMgr := addrmgr.New("", net.LookupIP) - addressMgr.AddAddresses(netAddrsReceived, pp.netAddr) - - // If the message had <= 10 addrs in it, then queue all the addresses for relaying - // on the next cycle. - if len(msg.AddrList) <= 10 { - glog.V(1).Infof("Server._handleAddrMessage: Queueing %d addrs for forwarding from "+ - "peer %v", len(msg.AddrList), pp) - sourceAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: pp.netAddr.IP, - Port: pp.netAddr.Port, - Services: pp.serviceFlags, - } - listToAddTo, hasSeenSource := srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] - if !hasSeenSource { - listToAddTo = []*SingleAddr{} - } - // If this peer has been sending us a lot of little crap, evict a lot of their - // stuff but don't disconnect. - if len(listToAddTo) > MaxAddrsPerAddrMsg { - listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] - } - listToAddTo = append(listToAddTo, msg.AddrList...) - srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo - } -} - -func (srv *Server) _handleGetAddrMessage(pp *Peer, msg *MsgDeSoGetAddr) { - glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", pp) - // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr - // and send them back to the peer. - // TODO: temporary - addressMgr := addrmgr.New("", net.LookupIP) - netAddrsFound := addressMgr.AddressCache() - if len(netAddrsFound) > MaxAddrsPerAddrMsg { - netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] - } - - // Convert the list to a SingleAddr list. - res := &MsgDeSoAddr{} - for _, netAddr := range netAddrsFound { - singleAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: netAddr.IP, - Port: netAddr.Port, - Services: (ServiceFlag)(netAddr.Services), - } - res.AddrList = append(res.AddrList, singleAddr) - } - pp.AddDeSoMessage(res, false) -} - func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_shouldQuit bool) { switch serverMessage.Msg.(type) { // Control messages used internally to signal to the server. @@ -2398,20 +2302,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Infof("Server._startConsensus: Handling message of type %v from Peer %v", serverMessage.Msg.GetMsgType(), serverMessage.Peer) - - // If the message is an addr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeAddr { - srv._handleAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoAddr)) - continue - } - // If the message is a GetAddr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeGetAddr { - srv._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoGetAddr)) - continue - } - srv._handlePeerMessages(serverMessage) // Always check for and handle control messages regardless of whether the @@ -2432,98 +2322,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Info("Server.Start: Server done") } -func (srv *Server) _getAddrsToBroadcast() []*SingleAddr { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - // If there's nothing in the map, return. - if len(srv.addrsToBroadcastt) == 0 { - return []*SingleAddr{} - } - - // If we get here then we have some addresses to broadcast. - addrsToBroadcast := []*SingleAddr{} - for len(addrsToBroadcast) < 10 && len(srv.addrsToBroadcastt) > 0 { - // Choose a key at random. This works because map iteration is random in golang. - bucket := "" - for kk := range srv.addrsToBroadcastt { - bucket = kk - break - } - - // Remove the last element from the slice for the given bucket. - currentAddrList := srv.addrsToBroadcastt[bucket] - if len(currentAddrList) > 0 { - lastIndex := len(currentAddrList) - 1 - currentAddr := currentAddrList[lastIndex] - currentAddrList = currentAddrList[:lastIndex] - if len(currentAddrList) == 0 { - delete(srv.addrsToBroadcastt, bucket) - } else { - srv.addrsToBroadcastt[bucket] = currentAddrList - } - - addrsToBroadcast = append(addrsToBroadcast, currentAddr) - } - } - - return addrsToBroadcast -} - -// Must be run inside a goroutine. Relays addresses to peers at regular intervals -// and relays our own address to peers once every 24 hours. -func (srv *Server) _startAddressRelayer() { - for numMinutesPassed := 0; ; numMinutesPassed++ { - if atomic.LoadInt32(&srv.shutdown) >= 1 { - break - } - // For the first ten minutes after the server starts, relay our address to all - // peers. After the first ten minutes, do it once every 24 hours. - // TODO: temporary - addressMgr := addrmgr.New("", net.LookupIP) - glog.V(1).Infof("Server.Start._startAddressRelayer: Relaying our own addr to peers") - if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { - for _, pp := range srv.cmgr.GetAllPeers() { - bestAddress := addressMgr.GetBestLocalAddress(pp.netAddr) - if bestAddress != nil { - glog.V(2).Infof("Server.Start._startAddressRelayer: Relaying address %v to "+ - "peer %v", bestAddress.IP.String(), pp) - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: []*SingleAddr{ - { - Timestamp: time.Now(), - IP: bestAddress.IP, - Port: bestAddress.Port, - Services: (ServiceFlag)(bestAddress.Services), - }, - }, - }, false) - } - } - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Seeing if there are addrs to relay...") - // Broadcast the addrs we have to all of our peers. - addrsToBroadcast := srv._getAddrsToBroadcast() - if len(addrsToBroadcast) == 0 { - glog.V(2).Infof("Server.Start._startAddressRelayer: No addrs to relay.") - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Found %d addrs to "+ - "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) - // Iterate over all our peers and broadcast the addrs to all of them. - for _, pp := range srv.cmgr.GetAllPeers() { - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: addrsToBroadcast, - }, false) - } - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } -} - func (srv *Server) _startTransactionRelayer() { // If we've set a maximum sync height, we will not relay transactions. if srv.blockchain.MaxSyncBlockHeight > 0 { @@ -2619,8 +2417,6 @@ func (srv *Server) Start() { go srv._startConsensus() - go srv._startAddressRelayer() - go srv._startTransactionRelayer() // Once the ConnectionManager is started, peers will be found and connected to and