From 0df796ec6a5e99caddf88ff55d8e1e38fd516285 Mon Sep 17 00:00:00 2001 From: Piotr Nojszewski <29924594+AeonSw4n@users.noreply.github.com> Date: Mon, 6 Nov 2023 03:43:38 -0800 Subject: [PATCH] Revert "Split code again" This reverts commit e44c26f58cc012e9ff0781d9ab75584ea9d3d620. --- lib/connection_manager.go | 282 ++-------- lib/constants.go | 7 + lib/network.go | 5 +- lib/network_test.go | 4 +- lib/peer.go | 7 +- lib/pos_connection_controller.go | 723 +++++++++++++++++++++++++ lib/pos_consensus_controller.go | 2 - lib/pos_handshake_controller.go | 382 +++---------- lib/pos_remote_node.go | 456 ++++++++++++++++ lib/pos_remote_node_id.go | 57 ++ lib/pos_remote_node_indexer.go | 231 ++++++++ lib/pos_remote_node_indexer_manager.go | 165 ++++++ lib/server.go | 237 +------- 13 files changed, 1777 insertions(+), 781 deletions(-) create mode 100644 lib/pos_connection_controller.go create mode 100644 lib/pos_remote_node.go create mode 100644 lib/pos_remote_node_id.go create mode 100644 lib/pos_remote_node_indexer.go create mode 100644 lib/pos_remote_node_indexer_manager.go diff --git a/lib/connection_manager.go b/lib/connection_manager.go index fa177dab8..6d6d3f7d1 100644 --- a/lib/connection_manager.go +++ b/lib/connection_manager.go @@ -3,7 +3,6 @@ package lib import ( "fmt" "net" - "strconv" "sync/atomic" "time" @@ -13,7 +12,6 @@ import ( "github.com/decred/dcrd/lru" "github.com/deso-protocol/go-deadlock" "github.com/golang/glog" - "github.com/pkg/errors" ) // connection_manager.go contains most of the logic for creating and managing @@ -35,24 +33,10 @@ type ConnectionManager struct { // doesn't need a reference to the Server object. But for now we keep things lazy. srv *Server - // When --connectips is set, we don't connect to anything from the addrmgr. - connectIps []string - - // The address manager keeps track of peer addresses we're aware of. When - // we need to connect to a new outbound peer, it chooses one of the addresses - // it's aware of at random and provides it to us. - AddrMgr *addrmgr.AddrManager // The interfaces we listen on for new incoming connections. listeners []net.Listener // The parameters we are initialized with. params *DeSoParams - // The target number of outbound peers we want to have. - targetOutboundPeers uint32 - // The maximum number of inbound peers we allow. - maxInboundPeers uint32 - // When true, only one connection per IP is allowed. Prevents eclipse attacks - // among other things. - limitOneInboundConnectionPerIP bool // When --hypersync is set to true we will attempt fast block synchronization HyperSync bool @@ -135,10 +119,8 @@ type ConnectionManager struct { } func NewConnectionManager( - _params *DeSoParams, _addrMgr *addrmgr.AddrManager, _listeners []net.Listener, + _params *DeSoParams, _listeners []net.Listener, _connectIps []string, _timeSource chainlib.MedianTimeSource, - _targetOutboundPeers uint32, _maxInboundPeers uint32, - _limitOneInboundConnectionPerIP bool, _hyperSync bool, _syncType NodeSyncType, _stallTimeoutSeconds uint64, @@ -149,16 +131,13 @@ func NewConnectionManager( ValidateHyperSyncFlags(_hyperSync, _syncType) return &ConnectionManager{ - srv: _srv, - params: _params, - AddrMgr: _addrMgr, - listeners: _listeners, - connectIps: _connectIps, + srv: _srv, + params: _params, + listeners: _listeners, // We keep track of the last N nonces we've sent in order to detect // self connections. sentNonces: lru.NewCache(1000), timeSource: _timeSource, - //newestBlock: _newestBlock, // Initialize the peer data structures. @@ -176,25 +155,14 @@ func NewConnectionManager( donePeerChan: make(chan *Peer, 100), outboundConnectionChan: make(chan *outboundConnection, 100), - targetOutboundPeers: _targetOutboundPeers, - maxInboundPeers: _maxInboundPeers, - limitOneInboundConnectionPerIP: _limitOneInboundConnectionPerIP, - HyperSync: _hyperSync, - SyncType: _syncType, - serverMessageQueue: _serverMessageQueue, - stallTimeoutSeconds: _stallTimeoutSeconds, - minFeeRateNanosPerKB: _minFeeRateNanosPerKB, + HyperSync: _hyperSync, + SyncType: _syncType, + serverMessageQueue: _serverMessageQueue, + stallTimeoutSeconds: _stallTimeoutSeconds, + minFeeRateNanosPerKB: _minFeeRateNanosPerKB, } } -func (cmgr *ConnectionManager) GetAddrManager() *addrmgr.AddrManager { - return cmgr.AddrMgr -} - -func (cmgr *ConnectionManager) SetTargetOutboundPeers(numPeers uint32) { - cmgr.targetOutboundPeers = numPeers -} - // Check if the address passed shares a group with any addresses already in our // data structures. func (cmgr *ConnectionManager) isRedundantGroupKey(na *wire.NetAddress) bool { @@ -232,76 +200,6 @@ func (cmgr *ConnectionManager) subFromGroupKey(na *wire.NetAddress) { cmgr.mtxOutboundConnIPGroups.Unlock() } -func (cmgr *ConnectionManager) getRandomAddr() *wire.NetAddress { - for tries := 0; tries < 100; tries++ { - addr := cmgr.AddrMgr.GetAddress() - if addr == nil { - glog.V(2).Infof("ConnectionManager.getRandomAddr: addr from GetAddressWithExclusions was nil") - break - } - - // Lock the address map since multiple threads will be trying to read - // and modify it at the same time. - cmgr.mtxConnectedOutboundAddrs.RLock() - ok := cmgr.connectedOutboundAddrs[addrmgr.NetAddressKey(addr.NetAddress())] - cmgr.mtxConnectedOutboundAddrs.RUnlock() - if ok { - glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing already connected address %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) - continue - } - - // We can only have one outbound address per /16. This is similar to - // Bitcoin and we do it to prevent Sybil attacks. - if cmgr.isRedundantGroupKey(addr.NetAddress()) { - glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) - continue - } - - glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning %v:%v at %d iterations", - addr.NetAddress().IP, addr.NetAddress().Port, tries) - return addr.NetAddress() - } - - glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") - return nil -} - -func (cmgr *ConnectionManager) enoughOutboundPeers() bool { - val := atomic.LoadUint32(&cmgr.numOutboundPeers) - if val > cmgr.targetOutboundPeers { - glog.Errorf("enoughOutboundPeers: Connected to too many outbound "+ - "peers: (%d). Should be "+ - "no more than (%d).", val, cmgr.targetOutboundPeers) - return true - } - - if val == cmgr.targetOutboundPeers { - return true - } - return false -} - -func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) { - port := params.DefaultSocketPort - host, portstr, err := net.SplitHostPort(ipStr) - if err != nil { - // No port specified so leave port=default and set - // host to the ipStr. - host = ipStr - } else { - pp, err := strconv.ParseUint(portstr, 10, 16) - if err != nil { - return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) - } - port = uint16(pp) - } - netAddr, err := addrMgr.HostToNetAddress(host, port, 0) - if err != nil { - return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) - } - return netAddr, nil -} - func (cmgr *ConnectionManager) IsConnectedOutboundIpAddress(netAddr *wire.NetAddress) bool { // Lock the address map since multiple threads will be trying to read // and modify it at the same time. @@ -353,15 +251,8 @@ func (cmgr *ConnectionManager) _createOutboundConnection(addr *wire.NetAddress, // is set, then we will connect only to that addr. Otherwise, we will use // the addrmgr to randomly select addrs and create OUTBOUND connections // with them until we find a worthy peer. -func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, attemptId uint64, isOutbound bool, isPersistent bool) error { - // At this point Conn is set so create a peer object to do - // a version negotiation. - ipStr := conn.RemoteAddr().String() - na, err := IPToNetAddr(ipStr, cmgr.AddrMgr, cmgr.params) - if err != nil { - return errors.Wrapf(err, "ConnectOutboundConnection: Problem calling ipToNetAddr for addr: (%s)", conn.RemoteAddr().String()) - } - +func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, na *wire.NetAddress, attemptId uint64, isOutbound bool, isPersistent bool) *Peer { + // At this point Conn is set so create a peer object to do a version negotiation. id := atomic.AddUint64(&cmgr.peerIndex, 1) peer := NewPeer(id, attemptId, conn, isOutbound, na, isPersistent, cmgr.stallTimeoutSeconds, @@ -370,34 +261,33 @@ func (cmgr *ConnectionManager) ConnectPeer(conn net.Conn, attemptId uint64, isOu cmgr.srv.incomingMessages, cmgr, cmgr.srv, cmgr.SyncType, cmgr.newPeerChan, cmgr.donePeerChan) - cmgr.newPeerChan <- peer - return nil + // Now we can add the peer to our data structures. + peer._logAddPeer() + cmgr.addPeer(peer) + + // Start the peer's message loop. + peer.Start() + + // FIXME: Move this earlier + // Signal the server about the new Peer in case it wants to do something with it. + go func() { + cmgr.serverMessageQueue <- &ServerMessage{ + Peer: peer, + Msg: &MsgDeSoNewPeer{}, + } + }() + + return peer } -func (cmgr *ConnectionManager) _isFromRedundantInboundIPAddress(addrToCheck net.Addr) bool { +func (cmgr *ConnectionManager) _isFromRedundantInboundIPAddress(netAddr *wire.NetAddress) bool { cmgr.mtxPeerMaps.RLock() defer cmgr.mtxPeerMaps.RUnlock() // Loop through all the peers to see if any have the same IP // address. This map is normally pretty small so doing this // every time a Peer connects should be fine. - netAddr, err := IPToNetAddr(addrToCheck.String(), cmgr.AddrMgr, cmgr.params) - if err != nil { - // Return true in case we have an error. We do this because it - // will result in the peer connection not being accepted, which - // is desired in this case. - glog.Warningf(errors.Wrapf(err, - "ConnectionManager._isFromRedundantInboundIPAddress: Problem parsing "+ - "net.Addr to wire.NetAddress so marking as redundant and not "+ - "making connection").Error()) - return true - } - if netAddr == nil { - glog.Warningf("ConnectionManager._isFromRedundantInboundIPAddress: " + - "address was nil after parsing so marking as redundant and not " + - "making connection") - return true - } + // If the IP is a localhost IP let it slide. This is useful for testing fake // nodes on a local machine. // TODO: Should this be a flag? @@ -437,37 +327,6 @@ func (cmgr *ConnectionManager) _handleInboundConnections() { continue } - // As a quick check, reject the peer if we have too many already. Note that - // this check isn't perfect but we have a later check at the end after doing - // a version negotiation that will properly reject the peer if this check - // messes up e.g. due to a concurrency issue. - // - // TODO: We should instead have eviction logic here to prevent - // someone from monopolizing a node's inbound connections. - numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers) - if numInboundPeers > cmgr.maxInboundPeers { - - glog.Infof("Rejecting INBOUND peer (%s) due to max inbound peers (%d) hit.", - conn.RemoteAddr().String(), cmgr.maxInboundPeers) - conn.Close() - - continue - } - - // If we want to limit inbound connections to one per IP address, check to - // make sure this address isn't already connected. - if cmgr.limitOneInboundConnectionPerIP && - cmgr._isFromRedundantInboundIPAddress(conn.RemoteAddr()) { - - glog.Infof("Rejecting INBOUND peer (%s) due to already having an "+ - "inbound connection from the same IP with "+ - "limit_one_inbound_connection_per_ip set.", - conn.RemoteAddr().String()) - conn.Close() - - continue - } - cmgr.inboundConnectionChan <- &inboundConnection{ connection: conn, } @@ -621,28 +480,14 @@ func (cmgr *ConnectionManager) removePeer(pp *Peer) { // Update the last seen time before we finish removing the peer. // TODO: Really, we call 'Connected()' on removing a peer? // I can't find a Disconnected() but seems odd. - cmgr.AddrMgr.Connected(pp.netAddr) + // FIXME: Move this to Done Peer + //cmgr.AddrMgr.Connected(pp.netAddr) // Remove the peer from our data structure. delete(peerList, pp.ID) delete(cmgr.connectedPeers, pp.ID) } -func (cmgr *ConnectionManager) _maybeReplacePeer(pp *Peer) { - // If the peer was outbound, replace her with a - // new peer to maintain a fixed number of outbound connections. - if pp.isOutbound { - // If the peer is not persistent then we don't want to pass an - // address to connectPeer. The lack of an address will cause it - // to choose random addresses from the addrmgr until one works. - na := pp.netAddr - if !pp.isPersistent { - na = nil - } - cmgr._createOutboundConnection(na, pp.isPersistent) - } -} - func (cmgr *ConnectionManager) _logOutboundPeerData() { numOutboundPeers := int(atomic.LoadUint32(&cmgr.numOutboundPeers)) numInboundPeers := int(atomic.LoadUint32(&cmgr.numInboundPeers)) @@ -720,68 +565,6 @@ func (cmgr *ConnectionManager) Start() { Connection: ic, }, } - case pp := <-cmgr.newPeerChan: - { - // We have successfully connected to a peer and it passed its version - // negotiation. - - // if this is a non-persistent outbound peer and we already have enough - // outbound peers, then don't bother adding this one. - if !pp.isPersistent && pp.isOutbound && cmgr.enoughOutboundPeers() { - // TODO: Make this less verbose - glog.V(1).Infof("Dropping peer because we already have enough outbound peer connections.") - pp.Conn.Close() - continue - } - - // If this is a non-persistent outbound peer and the group key - // overlaps with another peer we're already connected to then - // abort mission. We only connect to one peer per IP group in - // order to prevent Sybil attacks. - if pp.isOutbound && - !pp.isPersistent && - cmgr.isRedundantGroupKey(pp.netAddr) { - - // TODO: Make this less verbose - glog.Infof("Rejecting OUTBOUND NON-PERSISTENT peer (%v) with "+ - "redundant group key (%s).", - pp, addrmgr.GroupKey(pp.netAddr)) - - pp.Conn.Close() - cmgr._maybeReplacePeer(pp) - continue - } - - // Check that we have not exceeded the maximum number of inbound - // peers allowed. - // - // TODO: We should instead have eviction logic to prevent - // someone from monopolizing a node's inbound connections. - numInboundPeers := atomic.LoadUint32(&cmgr.numInboundPeers) - if !pp.isOutbound && numInboundPeers > cmgr.maxInboundPeers { - - // TODO: Make this less verbose - glog.Infof("Rejecting INBOUND peer (%v) due to max inbound peers (%d) hit.", - pp, cmgr.maxInboundPeers) - - pp.Conn.Close() - continue - } - - // Now we can add the peer to our data structures. - pp._logAddPeer() - cmgr.addPeer(pp) - - // Start the peer's message loop. - pp.Start() - - // Signal the server about the new Peer in case it wants to do something with it. - cmgr.serverMessageQueue <- &ServerMessage{ - Peer: pp, - Msg: &MsgDeSoNewPeer{}, - } - - } case pp := <-cmgr.donePeerChan: { // By the time we get here, it can be assumed that the Peer's Disconnect function @@ -795,7 +578,6 @@ func (cmgr *ConnectionManager) Start() { // Potentially replace the peer. For example, if the Peer was an outbound Peer // then we want to find a new peer in order to maintain our TargetOutboundPeers. - cmgr._maybeReplacePeer(pp) // Signal the server about the Peer being done in case it wants to do something // with it. diff --git a/lib/constants.go b/lib/constants.go index d6dbf1252..7a565bb6f 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -561,6 +561,9 @@ type DeSoParams struct { // The amount of time we wait to receive a version message from a peer. VersionNegotiationTimeout time.Duration + // The maximum number of addresses to broadcast to peers. + MaxAddressesToBroadcast uint32 + // The genesis block to use as the base of our chain. GenesisBlock *MsgDeSoBlock // The expected hash of the genesis block. Should align with what one @@ -1019,6 +1022,8 @@ var DeSoMainnetParams = DeSoParams{ DialTimeout: 30 * time.Second, VersionNegotiationTimeout: 30 * time.Second, + MaxAddressesToBroadcast: 10, + BlockRewardMaturity: time.Hour * 3, V1DifficultyAdjustmentFactor: 10, @@ -1288,6 +1293,8 @@ var DeSoTestnetParams = DeSoParams{ DialTimeout: 30 * time.Second, VersionNegotiationTimeout: 30 * time.Second, + MaxAddressesToBroadcast: 10, + GenesisBlock: &GenesisBlock, GenesisBlockHashHex: GenesisBlockHashHex, diff --git a/lib/network.go b/lib/network.go index 2c81c5947..6a90c91a1 100644 --- a/lib/network.go +++ b/lib/network.go @@ -17,11 +17,10 @@ import ( "strings" "time" - "github.com/deso-protocol/core/collections/bitset" - "github.com/golang/glog" - "github.com/decred/dcrd/dcrec/secp256k1/v4" + "github.com/golang/glog" + "github.com/deso-protocol/core/collections/bitset" "github.com/deso-protocol/core/consensus" "github.com/btcsuite/btcd/btcec" diff --git a/lib/network_test.go b/lib/network_test.go index 5c8191b2b..452d59318 100644 --- a/lib/network_test.go +++ b/lib/network_test.go @@ -76,13 +76,13 @@ func TestVerack(t *testing.T) { var buf bytes.Buffer nonce := uint64(12345678910) - _, err := WriteMessage(&buf, &MsgDeSoVerack{Nonce: nonce}, networkType) + _, err := WriteMessage(&buf, &MsgDeSoVerack{Version: VerackVersion0, NonceReceived: nonce}, networkType) require.NoError(err) verBytes := buf.Bytes() testMsg, _, err := ReadMessage(bytes.NewReader(verBytes), networkType) require.NoError(err) - require.Equal(&MsgDeSoVerack{Nonce: nonce}, testMsg) + require.Equal(&MsgDeSoVerack{Version: VerackVersion0, NonceReceived: nonce}, testMsg) } var expectedBlockHeaderVersion1 = &MsgDeSoHeader{ diff --git a/lib/peer.go b/lib/peer.go index 24bf5e291..9a0aac262 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -806,7 +806,7 @@ func (pp *Peer) IsOutbound() bool { return pp.isOutbound } -func (pp *Peer) IsPersistend() bool { +func (pp *Peer) IsPersistent() bool { return pp.isPersistent } @@ -1208,11 +1208,12 @@ func (pp *Peer) Start() { // If the address manager needs more addresses, then send a GetAddr message // to the peer. This is best-effort. if pp.cmgr != nil { - if pp.cmgr.AddrMgr.NeedMoreAddresses() { + // TODO: Move this to ConnectionController. + /*if pp.cmgr.AddrMgr.NeedMoreAddresses() { go func() { pp.QueueMessage(&MsgDeSoGetAddr{}) }() - } + }*/ } // Send our verack message now that the IO processing machinery has started. diff --git a/lib/pos_connection_controller.go b/lib/pos_connection_controller.go new file mode 100644 index 000000000..c288775d7 --- /dev/null +++ b/lib/pos_connection_controller.go @@ -0,0 +1,723 @@ +package lib + +import ( + "fmt" + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" + "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/go-deadlock" + "github.com/golang/glog" + "github.com/pkg/errors" + "net" + "strconv" + "sync" + "time" +) + +type ConnectionController struct { + // The parameters we are initialized with. + params *DeSoParams + + server *Server + signer *BLSSigner + + handshake *HandshakeController + + rniManager *RemoteNodeIndexerManager + + validatorMapLock sync.Mutex + getActiveValidators func() map[bls.PublicKey]*ValidatorEntry + + // The address manager keeps track of peer addresses we're aware of. When + // we need to connect to a new outbound peer, it chooses one of the addresses + // it's aware of at random and provides it to us. + AddrMgr *addrmgr.AddrManager + + // addrsToBroadcast is a list of all the addresses we've received from valid addr + // messages that we intend to broadcast to our peers. It is organized as: + // -> . + // + // It is organized in this way so that we can limit the number of addresses we + // are distributing for a single peer to avoid a DOS attack. + addrsToBroadcastLock deadlock.RWMutex + addrsToBroadcast map[string][]*SingleAddr + + // When --connectips is set, we don't connect to anything from the addrmgr. + connectIps []string + + // The target number of outbound peers we want to have. + targetOutboundPeers uint32 + // The maximum number of inbound peers we allow. + maxInboundPeers uint32 + // When true, only one connection per IP is allowed. Prevents eclipse attacks + // among other things. + limitOneInboundConnectionPerIP bool + + startGroup sync.WaitGroup + exitChan chan struct{} + exitGroup sync.WaitGroup +} + +func NewConnectionController(params *DeSoParams, server *Server, rniManager *RemoteNodeIndexerManager, signer *BLSSigner, + addrMgr *addrmgr.AddrManager, targetOutboundPeers uint32, maxInboundPeers uint32, + limitOneInboundConnectionPerIP bool) *ConnectionController { + + return &ConnectionController{ + params: params, + server: server, + signer: signer, + rniManager: rniManager, + AddrMgr: addrMgr, + addrsToBroadcast: make(map[string][]*SingleAddr), + targetOutboundPeers: targetOutboundPeers, + maxInboundPeers: maxInboundPeers, + limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP, + exitChan: make(chan struct{}), + } +} + +func (cc *ConnectionController) Start() { + cc.startGroup.Add(3) + // Start the validator connector + go cc.startValidatorConnector() + + cc.startGroup.Wait() + cc.exitGroup.Add(3) +} + +func (cc *ConnectionController) Stop() { + close(cc.exitChan) + cc.exitGroup.Wait() +} + +func (cc *ConnectionController) initiatePersistentConnections() { + // This is a hack to make outbound connections go away. + if cc.targetOutboundPeers == 0 { + return + } + if len(cc.connectIps) > 0 { + // Connect to addresses passed via the --connect-ips flag. These addresses + // are persistent in the sense that if we disconnect from one, we will + // try to reconnect to the same one. + for _, connectIp := range cc.connectIps { + if err := cc.createPersistentOutboundConnection(connectIp); err != nil { + glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+ + "to connectIp %v: %v", connectIp, err) + } + } + } +} + +func (cc *ConnectionController) startValidatorConnector() { + cc.startGroup.Done() + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Minute): + cc.validatorMapLock.Lock() + activeValidatorsMap := cc.getActiveValidators() + cc.refreshValidatorIndex(activeValidatorsMap) + cc.refreshValidatorAttemptedIndex(activeValidatorsMap) + cc.connectValidators(activeValidatorsMap) + cc.validatorMapLock.Unlock() + } + } +} + +func (cc *ConnectionController) startPeerConnector() { + cc.startGroup.Done() + + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + // Only connect to addresses from the addrmgr if we don't specify --connect-ips. + // These addresses are *not* persistent, meaning if we disconnect from one we'll + // try a different one. + // TODO: Do we still want this? + if len(cc.connectIps) == 0 { + continue + } + + cc.refreshOutboundIndex() + cc.refreshInboundIndex() + cc.connectPeers() + } + } +} + +// Must be run inside a goroutine. Relays addresses to peers at regular intervals +// and relays our own address to peers once every 24 hours. +func (cc *ConnectionController) startAddressRelayer() { + cc.startGroup.Done() + numMinutesPassed := 0 + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(AddrRelayIntervalSeconds * time.Second): + // For the first ten minutes after the connection controller starts, relay our address to all + // peers. After the first ten minutes, do it once every 24 hours. + glog.V(1).Infof("ConnectionController.startAddressRelayer: Relaying our own addr to peers") + if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { + // TODO: Change to retrieve all RemoteNodes from the indexer. + for _, pp := range cc.server.GetAllPeers() { + bestAddress := cc.AddrMgr.GetBestLocalAddress(pp.netAddr) + if bestAddress != nil { + glog.V(2).Infof("ConnectionController.startAddressRelayer: Relaying address %v to "+ + "peer %v", bestAddress.IP.String(), pp) + if err := cc.server.SendMessage(&MsgDeSoAddr{ + AddrList: []*SingleAddr{ + { + Timestamp: time.Now(), + IP: bestAddress.IP, + Port: bestAddress.Port, + Services: (ServiceFlag)(bestAddress.Services), + }, + }, + }, pp.ID); err != nil { + glog.Errorf("ConnectionController.startAddressRelayer: Problem sending "+ + "MsgDeSoAddr to peer %v: %v", pp, err) + } + } + } + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Seeing if there are addrs to relay...") + // Broadcast the addrs we have to all of our peers. + addrsToBroadcast := cc.getAddrsToBroadcast() + if len(addrsToBroadcast) == 0 { + glog.V(2).Infof("ConnectionController.startAddressRelayer: No addrs to relay.") + time.Sleep(AddrRelayIntervalSeconds * time.Second) + continue + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Found %d addrs to "+ + "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) + // Iterate over all our peers and broadcast the addrs to all of them. + for _, pp := range cc.server.GetAllPeers() { + pp.AddDeSoMessage(&MsgDeSoAddr{ + AddrList: addrsToBroadcast, + }, false) + } + time.Sleep(AddrRelayIntervalSeconds * time.Second) + numMinutesPassed++ + } + } +} + +// ########################### +// ## Handlers (Peer, DeSoMessage) +// ########################### + +func (cc *ConnectionController) _handleDonePeerMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeDonePeer { + return + } + + cc.rniManager.RemovePeer(origin) +} + +func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeAddr { + return + } + + var msg *MsgDeSoAddr + var ok bool + if msg, ok = desoMsg.(*MsgDeSoAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rniManager.DisconnectPeer(origin) + return + } + + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + glog.V(1).Infof("ConnectionController._handleAddrMessage: Received Addr from peer %v with addrs %v", origin, spew.Sdump(msg.AddrList)) + + // If this addr message contains more than the maximum allowed number of addresses + // then disconnect this peer. + if len(msg.AddrList) > MaxAddrsPerAddrMsg { + glog.Errorf(fmt.Sprintf("ConnectionController._handleAddrMessage: Disconnecting "+ + "Peer %v for sending us an addr message with %d transactions, which exceeds "+ + "the max allowed %d", + origin, len(msg.AddrList), MaxAddrsPerAddrMsg)) + + cc.rniManager.DisconnectPeer(origin) + return + } + + // Add all the addresses we received to the addrmgr. + netAddrsReceived := []*wire.NetAddress{} + for _, addr := range msg.AddrList { + addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) + if !addrmgr.IsRoutable(addrAsNetAddr) { + glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, origin) + continue + } + + netAddrsReceived = append( + netAddrsReceived, addrAsNetAddr) + } + cc.AddrMgr.AddAddresses(netAddrsReceived, origin.netAddr) + + // If the message had <= 10 addrs in it, then queue all the addresses for relaying + // on the next cycle. + if len(msg.AddrList) <= 10 { + glog.V(1).Infof("ConnectionController._handleAddrMessage: Queueing %d addrs for forwarding from "+ + "peer %v", len(msg.AddrList), origin) + sourceAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: origin.netAddr.IP, + Port: origin.netAddr.Port, + Services: origin.serviceFlags, + } + listToAddTo, hasSeenSource := cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] + if !hasSeenSource { + listToAddTo = []*SingleAddr{} + } + // If this peer has been sending us a lot of little crap, evict a lot of their + // stuff but don't disconnect. + if len(listToAddTo) > MaxAddrsPerAddrMsg { + listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] + } + listToAddTo = append(listToAddTo, msg.AddrList...) + cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo + } +} + +func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeGetAddr { + return + } + + if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rniManager.DisconnectPeer(origin) + return + } + + glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", origin) + // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr + // and send them back to the peer. + netAddrsFound := cc.AddrMgr.AddressCache() + if len(netAddrsFound) > MaxAddrsPerAddrMsg { + netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] + } + + // Convert the list to a SingleAddr list. + res := &MsgDeSoAddr{} + for _, netAddr := range netAddrsFound { + singleAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: netAddr.IP, + Port: netAddr.Port, + Services: (ServiceFlag)(netAddr.Services), + } + res.AddrList = append(res.AddrList, singleAddr) + } + cc.rniManager.SendMessageToPeer(origin, res) +} + +func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeNewConnection { + return + } + + var msg *MsgDeSoNewConnection + var ok bool + if msg, ok = desoMsg.(*MsgDeSoNewConnection); !ok { + return + } + + switch msg.Connection.GetConnectionType() { + case ConnectionTypeInbound: + if err := cc.processInboundConnection(msg.Connection); err != nil { + glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling inbound connection: %v", err) + msg.Connection.Close() + return + } + case ConnectionTypeOutbound: + if err := cc.processOutboundConnection(msg.Connection); err != nil { + glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling outbound connection: %v", err) + var oc *outboundConnection + if oc, ok = msg.Connection.(*outboundConnection); !ok { + return + } + id := NewRemoteNodeAttemptedId(oc.attemptId) + cc.rniManager.RemoveNonValidatorAttempted(id) + cc.server.RemoveAttemptedOutboundAddrs(oc.address) + msg.Connection.Close() + return + } + } +} + +// ########################### +// ## Validator Connections +// ########################### + +func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bls.PublicKey]*ValidatorEntry) { + // De-index inactive validators. + validatorRemoteNodeMap := cc.rniManager.GetRemoteNodeIndexer().GetValidatorIndex().GetIndex() + for pk, rn := range validatorRemoteNodeMap { + if _, ok := activeValidatorsMap[pk]; !ok { + cc.rniManager.UnsetValidator(pk, rn) + } + } + + // Look for validators in our existing outbound / inbound connections. + allNonValidators := cc.rniManager.GetAllNonValidators() + for _, rn := range allNonValidators { + meta := rn.GetHandshakeMetadata() + if meta == nil { + continue + } + pk := meta.GetValidatorPublicKey() + if _, ok := activeValidatorsMap[pk]; ok { + cc.rniManager.SetValidator(pk, rn) + } + } +} + +func (cc *ConnectionController) refreshValidatorAttemptedIndex(activeValidatorsMap map[bls.PublicKey]*ValidatorEntry) { + // Disconnect inactive attempted validators. + validatorRemoteNodeMap := cc.rniManager.GetRemoteNodeIndexer().GetValidatorAttemptedIndex().GetIndex() + for pk, rn := range validatorRemoteNodeMap { + if _, ok := activeValidatorsMap[pk]; !ok { + cc.rniManager.Disconnect(rn) + } + } +} + +func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.PublicKey]*ValidatorEntry) { + for pk, validator := range activeValidatorsMap { + _, connected := cc.rniManager.GetRemoteNodeIndexer().GetValidatorIndex().Get(pk) + _, attempted := cc.rniManager.GetRemoteNodeIndexer().GetValidatorAttemptedIndex().Get(pk) + if !connected && !attempted { + // FIXME: for now we'll only use the first address in the ValidatorEntry + address := string(validator.Domains[0]) + if err := cc.createValidatorConnection(address, pk); err != nil { + // TODO: Do we want to log an error here? + continue + } + } + } +} + +// ########################### +// ## Peer Connections +// ########################### + +func (cc *ConnectionController) connectPeers() { + numConnectedOutboundPeers := cc.rniManager.GetNumConnectedOutboundPeers() + numAttemptedPeers := cc.rniManager.GetNumAttemptedNonValidators() + + remainingOutboundPeers := uint32(0) + if numConnectedOutboundPeers+numAttemptedPeers < cc.targetOutboundPeers { + remainingOutboundPeers = cc.targetOutboundPeers - (numConnectedOutboundPeers + numAttemptedPeers) + } + for ii := uint32(0); ii < remainingOutboundPeers; ii++ { + addr := cc.getRandomUnconnectedAddress() + cc.AddrMgr.Attempt(addr) + cc.rniManager.CreateOutboundConnectionNetAddress(addr) + } +} + +func (cc *ConnectionController) refreshOutboundIndex() { + numConnectedOutboundPeers := cc.rniManager.GetNumConnectedOutboundPeers() + numAttemptedPeers := cc.rniManager.GetNumAttemptedNonValidators() + + excessiveOutboundPeers := uint32(0) + if numConnectedOutboundPeers+numAttemptedPeers > cc.targetOutboundPeers { + excessiveOutboundPeers = numConnectedOutboundPeers + numAttemptedPeers - cc.targetOutboundPeers + } + // Disconnect random outbound peers if we have too many peers. + for ii := uint32(0); ii < excessiveOutboundPeers; ii++ { + rn, ok := cc.rniManager.GetRemoteNodeIndexer().GetNonValidatorOutboundIndex().GetRandom() + if !ok { + break + } + cc.rniManager.Disconnect(rn) + } +} + +func (cc *ConnectionController) refreshInboundIndex() { + numConnectedInboundPeers := cc.rniManager.GetNumConnectedInboundPeers() + + excessiveInboundPeers := uint32(0) + if numConnectedInboundPeers > cc.maxInboundPeers { + excessiveInboundPeers = numConnectedInboundPeers - cc.maxInboundPeers + } + // Disconnect random inbound peers if we have too many peers. + for ii := uint32(0); ii < excessiveInboundPeers; ii++ { + rn, ok := cc.rniManager.GetRemoteNodeIndexer().GetNonValidatorInboundIndex().GetRandom() + if !ok { + break + } + cc.rniManager.Disconnect(rn) + } +} + +func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress { + for tries := 0; tries < 100; tries++ { + addr := cc.AddrMgr.GetAddress() + if addr == nil { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: addr from GetAddressWithExclusions was nil") + break + } + + if cc.server.IsConnectedOutboundIpAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundancy %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + if cc.server.IsAttemptedOutboundIpAddress(addr.NetAddress()) { + continue + } + + // We can only have one outbound address per /16. This is similar to + // Bitcoin and we do it to prevent Sybil attacks. + if cc.server.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + return addr.NetAddress() + } + + //glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") + return nil +} + +func (cc *ConnectionController) SetTargetOutboundPeers(numPeers uint32) { + cc.targetOutboundPeers = numPeers +} + +func (cc *ConnectionController) enoughInboundPeers() bool { + return cc.rniManager.GetNumConnectedInboundPeers() >= cc.maxInboundPeers +} + +func (cc *ConnectionController) enoughOutboundPeers() bool { + return cc.rniManager.GetNumConnectedOutboundPeers() >= cc.targetOutboundPeers +} + +func (cc *ConnectionController) processInboundConnection(conn Connection) error { + var ic *inboundConnection + var ok bool + if ic, ok = conn.(*inboundConnection); !ok { + return fmt.Errorf("ConnectionController.handleInboundConnection: Connection is not an inboundConnection") + } + + // As a quick check, reject the peer if we have too many already. Note that + // this check isn't perfect but we have a later check at the end after doing + // a version negotiation that will properly reject the peer if this check + // messes up e.g. due to a concurrency issue. + // + // TODO: We should instead have eviction logic here to prevent + // someone from monopolizing a node's inbound connections. + if cc.enoughInboundPeers() { + return fmt.Errorf("ConnectionController.handleInboundConnection: Rejecting INBOUND peer (%s) due to max "+ + "inbound peers (%d) hit", ic.connection.RemoteAddr().String(), cc.maxInboundPeers) + } + + // If we want to limit inbound connections to one per IP address, check to + // make sure this address isn't already connected. + if cc.limitOneInboundConnectionPerIP && + cc.isFromRedundantInboundIPAddress(ic.connection.RemoteAddr()) { + + return fmt.Errorf("ConnectionController.handleInboundConnection: Rejecting INBOUND peer (%s) due to already having an "+ + "inbound connection from the same IP with limit_one_inbound_connection_per_ip set", + ic.connection.RemoteAddr().String()) + } + + na, err := cc.ConvertIPStringToNetAddress(ic.connection.RemoteAddr().String()) + if err != nil { + return errors.Wrapf(err, "ConnectionController.handleInboundConnection: Problem calling ipToNetAddr "+ + "for addr: (%s)", ic.connection.RemoteAddr().String()) + } + + remoteNode := NewRemoteNode() + if err := remoteNode.ConnectInboundPeer(ic.connection, na); err != nil { + return errors.Wrapf(err, "ConnectionController.handleInboundConnection: Problem calling ConnectInboundPeer "+ + "for addr: (%s)", ic.connection.RemoteAddr().String()) + } + cc.rniManager.AddRemoteNode(remoteNode) + cc.rniManager.SetNonValidatorInbound(remoteNode) + + return nil +} + +func (cc *ConnectionController) processOutboundConnection(conn Connection) error { + var oc *outboundConnection + var ok bool + if oc, ok = conn.(*outboundConnection); !ok { + return fmt.Errorf("ConnectionController.handleOutboundConnection: Connection is not an outboundConnection") + } + + if oc.failed { + return fmt.Errorf("ConnectionController.handleOutboundConnection: Failed to connect to peer (%s)", oc.address.IP.String()) + } + + if !oc.isPersistent { + cc.AddrMgr.Connected(oc.address) + cc.AddrMgr.Good(oc.address) + } + + // if this is a non-persistent outbound peer and we already have enough + // outbound peers, then don't bother adding this one. + if !oc.isPersistent && cc.enoughOutboundPeers() { + return fmt.Errorf("ConnectionController.handleOutboundConnection: Connected to maximum number of outbound "+ + "peers (%d)", cc.targetOutboundPeers) + } + + // If this is a non-persistent outbound peer and the group key + // overlaps with another peer we're already connected to then + // abort mission. We only connect to one peer per IP group in + // order to prevent Sybil attacks. + if !oc.isPersistent && cc.server.IsFromRedundantOutboundIPAddress(oc.address) { + + // TODO: Make this less verbose + return fmt.Errorf("ConnectionController.handleOutboundConnection: Rejecting OUTBOUND NON-PERSISTENT connection with "+ + "redundant group key (%s).", addrmgr.GroupKey(oc.address)) + } + + na, err := cc.ConvertIPStringToNetAddress(oc.connection.RemoteAddr().String()) + if err != nil { + return errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling ipToNetAddr "+ + "for addr: (%s)", oc.connection.RemoteAddr().String()) + } + + remoteNode := NewRemoteNode() + if err := remoteNode.ConnectOutboundPeer(oc.connection, na, 0, false, false); err != nil { + return errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling ConnectOutboundPeer "+ + "for addr: (%s)", oc.connection.RemoteAddr().String()) + } + cc.rniManager.AddRemoteNode(remoteNode) + cc.rniManager.SetNonValidatorOutbound(remoteNode) + + return nil +} + +func (cc *ConnectionController) createValidatorConnection(ipStr string, pk bls.PublicKey) (_err error) { + netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) + if err != nil { + return err + } + cc.rniManager.CreateValidatorConnection(netAddr, pk) + return nil +} + +func (cc *ConnectionController) createPersistentOutboundConnection(ipStr string) (_err error) { + netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) + if err != nil { + return err + } + cc.rniManager.CreatePersistentOutboundConnectionNetAddress(netAddr) + return nil +} + +func (cc *ConnectionController) ConvertIPStringToNetAddress(ipStr string) (*wire.NetAddress, error) { + netAddr, err := IPToNetAddr(ipStr, cc.AddrMgr, cc.params) + if err != nil { + return nil, errors.Wrapf(err, + "ConnectionController.ConvertIPStringToNetAddress: Problem parsing "+ + "ipString to wire.NetAddress") + } + if netAddr == nil { + return nil, fmt.Errorf("ConnectionController.ConvertIPStringToNetAddress: " + + "address was nil after parsing") + } + return netAddr, nil +} + +func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) { + port := params.DefaultSocketPort + host, portstr, err := net.SplitHostPort(ipStr) + if err != nil { + // No port specified so leave port=default and set + // host to the ipStr. + host = ipStr + } else { + pp, err := strconv.ParseUint(portstr, 10, 16) + if err != nil { + return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) + } + port = uint16(pp) + } + netAddr, err := addrMgr.HostToNetAddress(host, port, 0) + if err != nil { + return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) + } + return netAddr, nil +} + +func (cc *ConnectionController) isFromRedundantInboundIPAddress(addr net.Addr) bool { + netAddr, err := IPToNetAddr(addr.String(), cc.AddrMgr, cc.params) + if err != nil { + // Return true in case we have an error. We do this because it + // will result in the peer connection not being accepted, which + // is desired in this case. + glog.Warningf(errors.Wrapf(err, + "ConnectionController._isFromRedundantInboundIPAddress: Problem parsing "+ + "net.Addr to wire.NetAddress so marking as redundant and not "+ + "making connection").Error()) + return true + } + if netAddr == nil { + glog.Warningf("ConnectionController._isFromRedundantInboundIPAddress: " + + "address was nil after parsing so marking as redundant and not " + + "making connection") + return true + } + + return cc.server.IsFromRedundantInboundIPAddress(netAddr) +} + +func (cc *ConnectionController) getAddrsToBroadcast() []*SingleAddr { + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + // If there's nothing in the map, return. + if len(cc.addrsToBroadcast) == 0 { + return []*SingleAddr{} + } + + // If we get here then we have some addresses to broadcast. + addrsToBroadcast := []*SingleAddr{} + for uint32(len(addrsToBroadcast)) < cc.params.MaxAddressesToBroadcast && + len(cc.addrsToBroadcast) > 0 { + // Choose a key at random. This works because map iteration is random in golang. + bucket := "" + for kk := range cc.addrsToBroadcast { + bucket = kk + break + } + + // Remove the last element from the slice for the given bucket. + currentAddrList := cc.addrsToBroadcast[bucket] + if len(currentAddrList) > 0 { + lastIndex := len(currentAddrList) - 1 + currentAddr := currentAddrList[lastIndex] + currentAddrList = currentAddrList[:lastIndex] + if len(currentAddrList) == 0 { + delete(cc.addrsToBroadcast, bucket) + } else { + cc.addrsToBroadcast[bucket] = currentAddrList + } + + addrsToBroadcast = append(addrsToBroadcast, currentAddr) + } + } + + return addrsToBroadcast +} diff --git a/lib/pos_consensus_controller.go b/lib/pos_consensus_controller.go index bba9959a3..abd171390 100644 --- a/lib/pos_consensus_controller.go +++ b/lib/pos_consensus_controller.go @@ -6,8 +6,6 @@ import ( "github.com/deso-protocol/core/collections" "github.com/deso-protocol/core/consensus" "github.com/pkg/errors" - - "github.com/deso-protocol/core/consensus" ) type ConsensusController struct { diff --git a/lib/pos_handshake_controller.go b/lib/pos_handshake_controller.go index 4c904b8cf..8d06e2729 100644 --- a/lib/pos_handshake_controller.go +++ b/lib/pos_handshake_controller.go @@ -1,160 +1,91 @@ package lib import ( - "encoding/binary" - "fmt" "github.com/decred/dcrd/lru" "github.com/deso-protocol/core/bls" "github.com/golang/glog" - "golang.org/x/crypto/sha3" - "math" - "time" ) -type HandshakeMetadata struct { - versionNoncesSent uint64 - versionNoncesReceived uint64 - userAgent string - versionNegotiated bool - ServiceFlag ServiceFlag - advertisedProtocolVersion ProtocolVersionType - negotiatedProtocolVersion ProtocolVersionType - minTxFeeRateNanosPerKB uint64 - timeConnected *time.Time - timeOffsetSecs int64 - versionTimeExpected *time.Time - verackTimeExpected *time.Time - StartingBlockHeight uint32 - validatorPublicKey bls.PublicKey -} - -func (hm *HandshakeMetadata) NegotiatedProtocolVersion() ProtocolVersionType { - return hm.negotiatedProtocolVersion -} +// TODO: Replace this +func getActiveValidators() map[bls.PublicKey]*ValidatorEntry { + // TODO: replace with a getter to retrieve all active validators. + activeValidators := []*ValidatorEntry{} + allValidatorsMap := make(map[bls.PublicKey]*ValidatorEntry) + for _, validator := range activeValidators { + pk := validator.VotingPublicKey + if pk == nil { + continue + } + allValidatorsMap[*pk] = validator + } -func (hm *HandshakeMetadata) GetValidatorPublicKey() bls.PublicKey { - return hm.validatorPublicKey + return allValidatorsMap } type HandshakeController struct { - bc *Blockchain - srv *Server - - params *DeSoParams - minTxFeeRateNanosPerKB uint64 - hyperSync bool - posValidator bool - keystore *BLSKeystore - - handshakeMetadataMap map[uint64]*HandshakeMetadata - usedNonces lru.Cache + rniManager *RemoteNodeIndexerManager + usedNonces lru.Cache + protocolOnProofOfStake func() bool // TODO + getActiveValidators func() map[bls.PublicKey]*ValidatorEntry // TODO } -func NewHandshakeController(bc *Blockchain, srv *Server, params *DeSoParams, minTxFeeRateNanosPerKB uint64, - hyperSync bool, signer *BLSKeystore) *HandshakeController { +func NewHandshakeController(rniManager *RemoteNodeIndexerManager) *HandshakeController { vm := &HandshakeController{ - bc: bc, - srv: srv, - params: params, - minTxFeeRateNanosPerKB: minTxFeeRateNanosPerKB, - hyperSync: hyperSync, - keystore: signer, - handshakeMetadataMap: make(map[uint64]*HandshakeMetadata), - usedNonces: lru.NewCache(1000), - } - - if signer != nil { - vm.posValidator = true + rniManager: rniManager, + usedNonces: lru.NewCache(1000), } return vm } -func (hc *HandshakeController) GetHandshakeMetadata(peerId uint64) *HandshakeMetadata { - return hc.getHandshakeMetadata(peerId) -} +func (hc *HandshakeController) handlePoSHandshakePeerMessage(origin *Peer, desoMsg DeSoMessage) { + if !hc.protocolOnProofOfStake() { + return + } -func (hc *HandshakeController) getHandshakeMetadata(peerId uint64) *HandshakeMetadata { - if _, exists := hc.handshakeMetadataMap[peerId]; !exists { - hc.handshakeMetadataMap[peerId] = &HandshakeMetadata{} + // Get the handshake information of this peer. + rn := hc.rniManager.GetRemoteNodeFromPeer(origin) + if rn == nil { + return } - return hc.handshakeMetadataMap[peerId] -} -func (hc *HandshakeController) _handleNewPeerMessage(origin *Peer, desoMsg DeSoMessage) { - if desoMsg.GetMsgType() != MsgTypeNewPeer { + handshakeMetadata := rn.GetHandshakeMetadata() + // Make sure the peer is on the right proof of stake version + if handshakeMetadata.NegotiatedProtocolVersion() != ProtocolVersion2 { + // Disconnect the peer because we only accept validators running proof of stake. + hc.rniManager.Disconnect(rn) + //# cc.removeValidator(origin) return } - // TODO: Do we want to reject peers outright if their BLS key is not in the validator set? Or is it okay to reject - // them in consensus upon handshake completion. - if origin.IsOutbound() { - vMeta := hc.getHandshakeMetadata(origin.ID) - versionTimeExpected := time.Now().Add(hc.params.VersionNegotiationTimeout) - vMeta.versionTimeExpected = &versionTimeExpected - hc.sendVersion(origin.ID) + // Get all active validators and see if the peer is one of them. + activeValidators := hc.getActiveValidators() + validatorPk := handshakeMetadata.GetValidatorPublicKey() + + // If there's already a validator connected with the same public key, disconnect the peer. + if _, ok := hc.rniManager.GetRemoteNodeIndexer().GetValidatorIndex().Get(validatorPk); ok { + hc.rniManager.Disconnect(rn) + return } -} -func (hc *HandshakeController) _handleDonePeerMessage(origin *Peer, desoMsg DeSoMessage) { - if desoMsg.GetMsgType() != MsgTypeDonePeer { + // If the peer is not an active validator, there is nothing else to check so return. + if _, ok := activeValidators[validatorPk]; !ok { return } - delete(hc.handshakeMetadataMap, origin.ID) + // So we know this peer is an active validator. Add it to the validator index. + hc.rniManager.SetValidator(validatorPk, rn) } -func (hc *HandshakeController) sendVersion(peerId uint64) { - // For an outbound peer, we send a version message and then wait to - // hear back for one. - verMsg := hc.newVersionMessage() - - // Record the nonce of this version message before we send it so we can - // detect self connections and so we can validate that the peer actually - // controls the IP she's supposedly communicating to us from. - vMeta := hc.getHandshakeMetadata(peerId) - vMeta.versionNoncesSent = verMsg.Nonce - hc.usedNonces.Add(verMsg.Nonce) - - if err := hc.srv.SendMessage(verMsg, peerId); err != nil { - glog.Errorf("sendVersion: Problem sending version message to peer (id= %d): %v", peerId, err) +func (hc *HandshakeController) _handleHandshakePeerMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeHandshakePeer { + return } -} -func (hc *HandshakeController) newVersionMessage() *MsgDeSoVersion { - ver := NewMessage(MsgTypeVersion).(*MsgDeSoVersion) - - ver.Version = hc.params.ProtocolVersion.ToUint64() - ver.TstampSecs = time.Now().Unix() - // We use an int64 instead of a uint64 for convenience but - // this should be fine since we're just looking to generate a - // unique value. - ver.Nonce = uint64(RandInt64(math.MaxInt64)) - ver.UserAgent = hc.params.UserAgent - // TODO: Right now all peers are full nodes. Later on we'll want to change this, - // at which point we'll need to do a little refactoring. - ver.Services = SFFullNodeDeprecated - if hc.hyperSync { - ver.Services |= SFHyperSync - } - if hc.bc.archivalMode { - ver.Services |= SFArchivalNode - } - if hc.posValidator { - ver.Services |= SFPosValidator + if hc.protocolOnProofOfStake() { + hc.handlePoSHandshakePeerMessage(origin, desoMsg) } - - // When a node asks you for what height you have, you should reply with - // the height of the latest actual block you have. This makes it so that - // peers who have up-to-date headers but missing blocks won't be considered - // for initial block download. - ver.StartBlockHeight = uint32(hc.bc.BlockTip().Header.Height) - - // Set the minimum fee rate the peer will accept. - ver.MinFeeRateNanosPerKB = hc.minTxFeeRateNanosPerKB - - return ver } func (hc *HandshakeController) _handleVersionMessage(origin *Peer, desoMsg DeSoMessage) { @@ -162,25 +93,15 @@ func (hc *HandshakeController) _handleVersionMessage(origin *Peer, desoMsg DeSoM return } - var verMsg *MsgDeSoVersion - var ok bool - if verMsg, ok = desoMsg.(*MsgDeSoVersion); !ok { - hc.srv.CloseConnection(origin.ID) + rn := hc.rniManager.GetRemoteNodeFromPeer(origin) + if rn == nil { return } - if verMsg.Version < hc.params.MinProtocolVersion { - glog.V(1).Infof("HandshakeController._handleVersionMessage: Requesting PeerDisconnect for id: (%v) protocol version "+ - "too low: %d (min: %v)", origin.ID, verMsg.Version, hc.params.MinProtocolVersion) - hc.srv.CloseConnection(origin.ID) - return - } - - vMeta := hc.getHandshakeMetadata(origin.ID) - if vMeta.versionTimeExpected != nil && vMeta.versionTimeExpected.Before(time.Now()) { - glog.V(1).Infof("HandshakeController._handleVersionMessage: Requesting PeerDisconnect for id: (%v) "+ - "version timeout. Time expected: %v, now: %v", origin.ID, vMeta.versionTimeExpected.UnixMicro(), time.Now().UnixMicro()) - hc.srv.CloseConnection(origin.ID) + var verMsg *MsgDeSoVersion + var ok bool + if verMsg, ok = desoMsg.(*MsgDeSoVersion); !ok { + hc.rniManager.Disconnect(rn) return } @@ -191,75 +112,13 @@ func (hc *HandshakeController) _handleVersionMessage(origin *Peer, desoMsg DeSoM hc.usedNonces.Delete(msgNonce) glog.V(1).Infof("HandshakeController._handleVersionMessage: Requesting PeerDisconnect for id: (%v) "+ "nonce collision", origin.ID) - hc.srv.CloseConnection(origin.ID) - return - } - // Save the version nonce so we can include it in our verack message. - vMeta.versionNoncesReceived = msgNonce - - // Set the peer info-related fields. - vMeta.userAgent = verMsg.UserAgent - vMeta.ServiceFlag = verMsg.Services - vMeta.advertisedProtocolVersion = NewProtocolVersionType(verMsg.Version) - negotiatedVersion := hc.params.ProtocolVersion - if verMsg.Version < hc.params.ProtocolVersion.ToUint64() { - negotiatedVersion = NewProtocolVersionType(verMsg.Version) - } - vMeta.negotiatedProtocolVersion = negotiatedVersion - vMeta.minTxFeeRateNanosPerKB = verMsg.MinFeeRateNanosPerKB - timeConnected := time.Unix(verMsg.TstampSecs, 0) - vMeta.timeConnected = &timeConnected - vMeta.timeOffsetSecs = verMsg.TstampSecs - time.Now().Unix() - vMeta.StartingBlockHeight = verMsg.StartBlockHeight - - // Update the timeSource now that we've gotten a version message from the peer. - hc.srv.AddTimeSample(origin.Address(), timeConnected) - - if !origin.IsOutbound() { - // Respond to the version message if this is an inbound peer. - hc.sendVersion(origin.ID) - } - // After sending and receiving a compatible version, complete the - // negotiation by sending and receiving a verack message. - verackTimeExpected := time.Now().Add(hc.params.VersionNegotiationTimeout) - vMeta.verackTimeExpected = &verackTimeExpected - if err := hc.sendVerack(origin.ID); err != nil { - glog.Errorf("HandshakeController._handleVersionMessage: Problem sending verack message to peer (id= %d): %v", origin.ID, err) - hc.srv.CloseConnection(origin.ID) + rn.Disconnect() return } -} -func (hc *HandshakeController) newVerackMessage(peerId uint64) (*MsgDeSoVerack, error) { - verack := NewMessage(MsgTypeVerack).(*MsgDeSoVerack) - vMeta := hc.getHandshakeMetadata(peerId) - - // Include the nonce we received in the peer's version message - verack.NonceReceived = vMeta.versionNoncesReceived - if vMeta.negotiatedProtocolVersion == ProtocolVersion2 { - var err error - verack.Version = VerackVersion1 - verack.NonceSent = vMeta.versionNoncesSent - verack.PublicKey = hc.keystore.GetSigner().GetPublicKey() - tstampMicro := uint64(time.Now().UnixMicro()) - verack.Signature, err = hc.keystore.GetSigner().SignPoSValidatorHandshake(verack.NonceSent, verack.NonceReceived, tstampMicro) - if err != nil { - return nil, fmt.Errorf("HandshakeController.newVerackMessage: Problem signing verack message: %v", err) - } - } - return verack, nil -} - -func (hc *HandshakeController) sendVerack(peerId uint64) error { - verackMsg, err := hc.newVerackMessage(peerId) - if err != nil { - return err - } - - if err := hc.srv.SendMessage(verackMsg, peerId); err != nil { - glog.Errorf("HandshakeController.sendVerack: Problem sending verack message to peer (id= %d): %v", peerId, err) - } - return nil + rn.HandleVersionMessage(verMsg, func(versionNonce uint64) { + hc.usedNonces.Add(versionNonce) + }) } func (hc *HandshakeController) _handleVerackMessage(origin *Peer, desoMsg DeSoMessage) { @@ -267,128 +126,25 @@ func (hc *HandshakeController) _handleVerackMessage(origin *Peer, desoMsg DeSoMe return } + rn := hc.rniManager.GetRemoteNodeFromPeer(origin) + if rn == nil { + return + } + var vrkMsg *MsgDeSoVerack var ok bool if vrkMsg, ok = desoMsg.(*MsgDeSoVerack); !ok { - hc.srv.CloseConnection(origin.ID) + hc.rniManager.Disconnect(rn) return } - vMeta := hc.getHandshakeMetadata(origin.ID) - nonceReceived := vMeta.versionNoncesReceived - nonceSent := vMeta.versionNoncesSent if !ok { glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ "nonce not found for peer", origin.ID) - hc.srv.CloseConnection(origin.ID) - return - } - if vMeta.verackTimeExpected != nil && vMeta.verackTimeExpected.Before(time.Now()) { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack timeout. Time expected: %v, now: %v", origin.ID, vMeta.verackTimeExpected.UnixMicro(), time.Now().UnixMicro()) - hc.srv.CloseConnection(origin.ID) + rn.Disconnect() return } - // If the verack message has a nonce unseen for us, then request peer disconnect. - // In legacy code we compared the msg nonce to both the sent nonce and the received nonce. - if vrkMsg.NonceReceived != nonceSent && vrkMsg.NonceReceived != nonceReceived { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "nonce mismatch; message: %v; nonceSent: %v; nonceReceived: %v", origin.ID, vrkMsg.NonceReceived, nonceSent, nonceReceived) - hc.srv.CloseConnection(origin.ID) - return - } - if vMeta.negotiatedProtocolVersion == ProtocolVersion2 { - // Verify that the verack message is formatted correctly. - if vrkMsg.Version != VerackVersion1 { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack version mismatch; message: %v; expected: %v", origin.ID, vrkMsg.Version, VerackVersion1) - hc.srv.CloseConnection(origin.ID) - return - } - // Verify that the counterparty's verack message's NonceSent matches the NonceReceived we sent. - if vrkMsg.NonceSent != nonceReceived { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack nonce mismatch; message: %v; expected: %v", origin.ID, vrkMsg.NonceSent, nonceReceived) - hc.srv.CloseConnection(origin.ID) - return - } - // Verify that the counterparty's verack message's NonceReceived matches the NonceSent we sent. - if vrkMsg.NonceReceived != nonceSent { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack nonce mismatch; message: %v; expected: %v", origin.ID, vrkMsg.NonceReceived, nonceSent) - hc.srv.CloseConnection(origin.ID) - return - } - if vrkMsg.PublicKey == nil || vrkMsg.Signature == nil { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack public key or signature is nil", origin.ID) - hc.srv.CloseConnection(origin.ID) - return - } - - // Get a verifier with the other node's public key. - verifier, err := hc.keystore.GetVerifier(vrkMsg.PublicKey) - if err != nil { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack public key not found", origin.ID) - hc.srv.CloseConnection(origin.ID) - return - } - - // Get the current time in microseconds and make sure the verack message's timestamp is within 15 minutes of it. - timeNowMicro := uint64(time.Now().UnixMicro()) - if vrkMsg.TstampMicro > timeNowMicro-hc.params.HandshakeTimeoutMicroSeconds { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack timestamp too far in the past. Time now: %v, verack timestamp: %v", origin.ID, timeNowMicro, vrkMsg.TstampMicro) - hc.srv.CloseConnection(origin.ID) - return - } - - ok, err = verifier.VerifyPoSValidatorHandshake(vrkMsg.NonceSent, vrkMsg.NonceReceived, vrkMsg.TstampMicro, vrkMsg.Signature) - if err != nil || !ok { - glog.V(1).Infof("HandshakeController._handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ - "verack signature verification failed: %v", origin.ID, err) - hc.srv.CloseConnection(origin.ID) - return - } - vMeta.validatorPublicKey = *vrkMsg.PublicKey - } - - // If we get here then the peer has successfully completed the handshakeController. - vMeta.versionNegotiated = true - go hc.srv.SendHandshakePeerMessage(origin) - hc._logVersionSuccess(origin) + rn.HandleVerackMessage(vrkMsg) return } - -func (hc *HandshakeController) _logVersionSuccess(peer *Peer) { - inboundStr := "INBOUND" - if peer.isOutbound { - inboundStr = "OUTBOUND" - } - persistentStr := "PERSISTENT" - if !peer.isPersistent { - persistentStr = "NON-PERSISTENT" - } - logStr := fmt.Sprintf("SUCCESS version negotiation for (%s) (%s) peer (%v).", inboundStr, persistentStr, peer) - glog.V(1).Info(logStr) -} - -func GetVerackHandshakePayload(nonceReceived uint64, nonceSent uint64, tstampMicro uint64) [32]byte { - // The payload for the verack message is the two nonces concatenated together. - // We do this so that we can sign the nonces and verify the signature on the other side. - nonceReceivedBytes := make([]byte, 8) - binary.BigEndian.PutUint64(nonceReceivedBytes, nonceReceived) - - nonceSentBytes := make([]byte, 8) - binary.BigEndian.PutUint64(nonceSentBytes, nonceSent) - - tstampBytes := make([]byte, 8) - binary.BigEndian.PutUint64(tstampBytes, tstampMicro) - - payload := append(nonceReceivedBytes, nonceSentBytes...) - payload = append(payload, tstampBytes...) - - return sha3.Sum256(payload) -} diff --git a/lib/pos_remote_node.go b/lib/pos_remote_node.go new file mode 100644 index 000000000..8b5ce1722 --- /dev/null +++ b/lib/pos_remote_node.go @@ -0,0 +1,456 @@ +package lib + +import ( + "encoding/binary" + "fmt" + "github.com/btcsuite/btcd/wire" + "github.com/deso-protocol/core/bls" + "github.com/golang/glog" + "github.com/pkg/errors" + "golang.org/x/crypto/sha3" + "math" + "net" + "time" +) + +type RNConnectionType int + +const ( + RNConnectionType_Unknown RNConnectionType = 0 + RNConnectionType_Outbound RNConnectionType = 1 + RNConnectionType_Inboound RNConnectionType = 2 +) + +type RNConnectionStatus int + +const ( + RNConnectionStatus_NotConnected RNConnectionStatus = 0 + RNConnectionStatus_Connected RNConnectionStatus = 1 + RNConnectionStatus_Attempted RNConnectionStatus = 2 + RNConnectionStatus_Terminated RNConnectionStatus = 3 +) + +//type RemoteNodeId struct { +// PeerId uint64 +// AttemptId uint64 +//} + +type RemoteNode struct { + // Should we have ID here? + peer *Peer + id RemoteNodeId + connectionType RNConnectionType + connectionStatus RNConnectionStatus + + params *DeSoParams + + bc *Blockchain + srv *Server + + minTxFeeRateNanosPerKB uint64 + hyperSync bool + posValidator bool + + handshakeMetadata *HandshakeMetadata + keystore *BLSKeystore +} + +type HandshakeMetadata struct { + versionNoncesSent uint64 + versionNoncesReceived uint64 + userAgent string + versionNegotiated bool + ServiceFlag ServiceFlag + advertisedProtocolVersion ProtocolVersionType + negotiatedProtocolVersion ProtocolVersionType + minTxFeeRateNanosPerKB uint64 + timeConnected *time.Time + timeOffsetSecs int64 + versionTimeExpected *time.Time + verackTimeExpected *time.Time + StartingBlockHeight uint32 + validatorPublicKey bls.PublicKey +} + +func NewRemoteNode() *RemoteNode { + return &RemoteNode{ + id: NewRemoteNodeNoId(), + connectionType: RNConnectionType_Unknown, + connectionStatus: RNConnectionStatus_NotConnected, + handshakeMetadata: nil, + } +} + +func (rn *RemoteNode) SetConnectedPeer(peer *Peer) { + rn.peer = peer + if rn.peer == nil { + return + } + + // Set connectionType + if rn.peer.IsOutbound() { + rn.connectionType = RNConnectionType_Outbound + rn.SetId(NewRemoteNodeOutboundId(peer.ID, peer.AttemptId())) + } else { + rn.connectionType = RNConnectionType_Inboound + rn.SetId(NewRemoteNodeInboundId(peer.ID)) + } + + // Set connectionStatus + rn.connectionStatus = RNConnectionStatus_Connected +} + +func (rn *RemoteNode) GetPeer() *Peer { + return rn.peer +} + +func (rn *RemoteNode) IsInbound() bool { + return rn.connectionType == RNConnectionType_Inboound +} + +func (rn *RemoteNode) IsOutbound() bool { + return rn.connectionType == RNConnectionType_Outbound +} + +func (rn *RemoteNode) SetId(id RemoteNodeId) { + rn.id = id +} + +func (rn *RemoteNode) GetId() RemoteNodeId { + return rn.id +} + +func (rn *RemoteNode) GetPeerId() uint64 { + peerId, _ := rn.id.GetIds() + return peerId +} + +func (rn *RemoteNode) CreateOutboundConnection(netAddr *wire.NetAddress) { + if rn.connectionStatus != RNConnectionStatus_NotConnected { + return + } + + attemptId := rn.srv.CreateOutboundConnection(netAddr) + id := NewRemoteNodeAttemptedId(attemptId) + rn.SetId(id) + rn.connectionStatus = RNConnectionStatus_Attempted +} + +func (rn *RemoteNode) CreatePersistentOutboundConnection(netAddr *wire.NetAddress) { + if rn.connectionStatus != RNConnectionStatus_NotConnected { + return + } + + attemptId := rn.srv.CreatePersistentOutboundConnection(netAddr) + id := NewRemoteNodeAttemptedId(attemptId) + rn.SetId(id) + rn.connectionStatus = RNConnectionStatus_Attempted +} + +func (rn *RemoteNode) ConnectInboundPeer(conn net.Conn, na *wire.NetAddress) error { + peer := rn.srv.ConnectPeer(conn, na, RemoteNodeIdNoAttempt, false, false) + if peer == nil { + return errors.Errorf("ConnectInboundPeer: Problem connecting peer (%s)", conn.RemoteAddr().String()) + } + rn.SetConnectedPeer(peer) + return nil +} + +func (rn *RemoteNode) ConnectOutboundPeer(conn net.Conn, na *wire.NetAddress, attemptId uint64, isOutbound bool, isPersistent bool) error { + peer := rn.srv.ConnectPeer(conn, na, attemptId, isOutbound, isPersistent) + if peer == nil { + return errors.Errorf("ConnectInboundPeer: Problem connecting peer (%s)", conn.RemoteAddr().String()) + } + rn.SetConnectedPeer(peer) + return nil +} + +func (rn *RemoteNode) Disconnect() { + peerId, attemptId := rn.id.GetIds() + switch rn.connectionStatus { + case RNConnectionStatus_Attempted: + rn.srv.CloseAttemptedConnection(attemptId) + case RNConnectionStatus_Connected: + rn.srv.CloseConnection(peerId) + } + rn.connectionStatus = RNConnectionStatus_Terminated +} + +func (rn *RemoteNode) SendMessage(desoMsg DeSoMessage) { + if rn.connectionStatus != RNConnectionStatus_Connected { + return + } + + if err := rn.srv.SendMessage(desoMsg, rn.GetPeerId()); err != nil { + glog.Errorf("sendMessage: Problem sending message to peer (id= %d): %v", rn.peer.ID, err) + } +} + +func (rn *RemoteNode) InitiateHandshake(reserveNonce func(versionNonce uint64)) error { + if rn.GetPeer() == nil { + return errors.Errorf("Remote node has no peer") + } + + if rn.GetPeer().IsOutbound() { + vMeta := rn.GetHandshakeMetadata() + versionTimeExpected := time.Now().Add(rn.params.VersionNegotiationTimeout) + vMeta.versionTimeExpected = &versionTimeExpected + rn.sendVersion(reserveNonce) + } + return nil +} + +func (rn *RemoteNode) GetHandshakeMetadata() *HandshakeMetadata { + if rn.handshakeMetadata == nil { + rn.handshakeMetadata = &HandshakeMetadata{} + } + return rn.handshakeMetadata +} + +func (rn *RemoteNode) sendVersion(reserveNonce func(versionNonce uint64)) { + verMsg := rn.newVersionMessage() + + // Record the nonce of this version message before we send it so we can + // detect self connections and so we can validate that the peer actually + // controls the IP she's supposedly communicating to us from. + vMeta := rn.GetHandshakeMetadata() + vMeta.versionNoncesSent = verMsg.Nonce + reserveNonce(verMsg.Nonce) + + if err := rn.srv.SendMessage(verMsg, rn.peer.ID); err != nil { + glog.Errorf("sendVersion: Problem sending version message to peer (id= %d): %v", rn.peer.ID, err) + } +} + +// Assume that newVersionMessage is a method of RemoteNode and updates accordingly +func (rn *RemoteNode) newVersionMessage() *MsgDeSoVersion { + ver := NewMessage(MsgTypeVersion).(*MsgDeSoVersion) + + ver.Version = rn.params.ProtocolVersion.ToUint64() + ver.TstampSecs = time.Now().Unix() + // We use an int64 instead of a uint64 for convenience but + // this should be fine since we're just looking to generate a + // unique value. + ver.Nonce = uint64(RandInt64(math.MaxInt64)) + ver.UserAgent = rn.params.UserAgent + // TODO: Right now all peers are full nodes. Later on we'll want to change this, + // at which point we'll need to do a little refactoring. + ver.Services = SFFullNodeDeprecated + if rn.hyperSync { + ver.Services |= SFHyperSync + } + if rn.bc.archivalMode { + ver.Services |= SFArchivalNode + } + if rn.posValidator { + ver.Services |= SFPosValidator + } + + // When a node asks you for what height you have, you should reply with + // the height of the latest actual block you have. This makes it so that + // peers who have up-to-date headers but missing blocks won't be considered + // for initial block download. + ver.StartBlockHeight = uint32(rn.bc.BlockTip().Header.Height) + + // Set the minimum fee rate the peer will accept. + ver.MinFeeRateNanosPerKB = rn.minTxFeeRateNanosPerKB + + return ver +} + +func (rn *RemoteNode) HandleVersionMessage(verMsg *MsgDeSoVersion, reserveNonce func(versionNonce uint64)) { + if verMsg.Version < rn.params.MinProtocolVersion { + glog.V(1).Infof("RemoteNode.HandleVersionMessage: Requesting PeerDisconnect for id: (%v) "+ + "protocol version too low. Peer version: %v, min version: %v", rn.peer.ID, verMsg.Version, rn.params.MinProtocolVersion) + rn.Disconnect() + return + } + + vMeta := rn.GetHandshakeMetadata() + if vMeta.versionTimeExpected != nil && vMeta.versionTimeExpected.Before(time.Now()) { + glog.V(1).Infof("RemoteNode.HandleVersionMessage: Requesting PeerDisconnect for id: (%v) "+ + "version timeout. Time expected: %v, now: %v", rn.peer.ID, vMeta.versionTimeExpected.UnixMicro(), time.Now().UnixMicro()) + rn.Disconnect() + return + } + + // Save the version nonce so we can include it in our verack message. + vMeta.versionNoncesReceived = verMsg.Nonce + + // Set the peer info-related fields. + vMeta.userAgent = verMsg.UserAgent + vMeta.ServiceFlag = verMsg.Services + vMeta.advertisedProtocolVersion = NewProtocolVersionType(verMsg.Version) + negotiatedVersion := rn.params.ProtocolVersion + if verMsg.Version < rn.params.ProtocolVersion.ToUint64() { + negotiatedVersion = NewProtocolVersionType(verMsg.Version) + } + vMeta.negotiatedProtocolVersion = negotiatedVersion + vMeta.minTxFeeRateNanosPerKB = verMsg.MinFeeRateNanosPerKB + timeConnected := time.Unix(verMsg.TstampSecs, 0) + vMeta.timeConnected = &timeConnected + vMeta.timeOffsetSecs = verMsg.TstampSecs - time.Now().Unix() + vMeta.StartingBlockHeight = verMsg.StartBlockHeight + + // Update the timeSource now that we've gotten a version message from the peer. + rn.srv.AddTimeSample(rn.peer.Address(), timeConnected) + + if !rn.peer.IsOutbound() { + // Respond to the version message if this is an inbound peer. + rn.sendVersion(reserveNonce) + } + // After sending and receiving a compatible version, complete the + // negotiation by sending and receiving a verack message. + verackTimeExpected := time.Now().Add(rn.params.VersionNegotiationTimeout) + vMeta.verackTimeExpected = &verackTimeExpected + if err := rn.sendVerack(); err != nil { + glog.Errorf("RemoteNode.HandleVersionMessage: Problem sending verack message to peer (id= %d): %v", rn.peer.ID, err) + rn.Disconnect() + return + } +} + +func (rn *RemoteNode) sendVerack() error { + verackMsg, err := rn.newVerackMessage() + if err != nil { + return err + } + + if err := rn.srv.SendMessage(verackMsg, rn.peer.ID); err != nil { + glog.Errorf("RemoteNode.SendVerack: Problem sending verack message to peer (id= %d): %v", rn.peer.ID, err) + return err + } + return nil +} + +func (rn *RemoteNode) newVerackMessage() (*MsgDeSoVerack, error) { + verack := NewMessage(MsgTypeVerack).(*MsgDeSoVerack) + vMeta := rn.GetHandshakeMetadata() + + // Include the nonce we received in the peer's version message + verack.NonceReceived = vMeta.versionNoncesReceived + if vMeta.negotiatedProtocolVersion == ProtocolVersion2 { + var err error + verack.Version = VerackVersion1 + verack.NonceSent = vMeta.versionNoncesSent + verack.PublicKey = rn.keystore.GetSigner().GetPublicKey() + tstampMicro := uint64(time.Now().UnixMicro()) + verack.Signature, err = rn.keystore.GetSigner().SignPoSValidatorHandshake(verack.NonceSent, verack.NonceReceived, tstampMicro) + if err != nil { + return nil, fmt.Errorf("RemoteNode.newVerackMessage: Problem signing verack message: %v", err) + } + } + return verack, nil +} + +func (rn *RemoteNode) HandleVerackMessage(vrkMsg *MsgDeSoVerack) { + vMeta := rn.GetHandshakeMetadata() + nonceReceived := vMeta.versionNoncesReceived + nonceSent := vMeta.versionNoncesSent + + if vMeta.verackTimeExpected != nil && vMeta.verackTimeExpected.Before(time.Now()) { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack timeout. Time expected: %v, now: %v", rn.peer.ID, vMeta.verackTimeExpected.UnixMicro(), time.Now().UnixMicro()) + rn.srv.CloseConnection(rn.peer.ID) + return + } + // If the verack message has a nonce unseen for us, then request peer disconnect. + if vrkMsg.NonceReceived != nonceSent && vrkMsg.NonceReceived != nonceReceived { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "nonce mismatch; message: %v; nonceSent: %v; nonceReceived: %v", rn.peer.ID, vrkMsg.NonceReceived, nonceSent, nonceReceived) + rn.srv.CloseConnection(rn.peer.ID) + return + } + if vMeta.negotiatedProtocolVersion == ProtocolVersion2 { + // Verify that the verack message is formatted correctly. + if vrkMsg.Version != VerackVersion1 { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack version mismatch; message: %v; expected: %v", rn.peer.ID, vrkMsg.Version, VerackVersion1) + rn.srv.CloseConnection(rn.peer.ID) + return + } + // Verify that the counterparty's verack message's NonceSent matches the NonceReceived we sent. + if vrkMsg.NonceSent != nonceReceived { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack nonce mismatch; message: %v; expected: %v", rn.peer.ID, vrkMsg.NonceSent, nonceReceived) + rn.srv.CloseConnection(rn.peer.ID) + return + } + // Verify that the counterparty's verack message's NonceReceived matches the NonceSent we sent. + if vrkMsg.NonceReceived != nonceSent { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack nonce mismatch; message: %v; expected: %v", rn.peer.ID, vrkMsg.NonceReceived, nonceSent) + rn.srv.CloseConnection(rn.peer.ID) + return + } + if vrkMsg.PublicKey == nil || vrkMsg.Signature == nil { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack public key or signature is nil", rn.peer.ID) + rn.srv.CloseConnection(rn.peer.ID) + return + } + + // Get the current time in microseconds and make sure the verack message's timestamp is within 15 minutes of it. + timeNowMicro := uint64(time.Now().UnixMicro()) + if vrkMsg.TstampMicro > timeNowMicro-rn.params.HandshakeTimeoutMicroSeconds { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack timestamp too far in the past. Time now: %v, verack timestamp: %v", rn.peer.ID, timeNowMicro, vrkMsg.TstampMicro) + rn.srv.CloseConnection(rn.peer.ID) + return + } + + ok, err := BLSVerifyPoSValidatorHandshake(vrkMsg.NonceSent, vrkMsg.NonceReceived, vrkMsg.TstampMicro, + vrkMsg.Signature, vrkMsg.PublicKey) + if err != nil || !ok { + glog.V(1).Infof("RemoteNode.HandleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ + "verack signature verification failed: %v", rn.peer.ID, err) + rn.srv.CloseConnection(rn.peer.ID) + return + } + vMeta.validatorPublicKey = *vrkMsg.PublicKey + } + + // If we get here then the peer has successfully completed the handshake. + vMeta.versionNegotiated = true + rn._logVersionSuccess(rn.peer) + rn.srv.SendHandshakePeerMessage(rn.peer) +} + +func (rn *RemoteNode) _logVersionSuccess(peer *Peer) { + inboundStr := "INBOUND" + if peer.isOutbound { + inboundStr = "OUTBOUND" + } + persistentStr := "PERSISTENT" + if !peer.isPersistent { + persistentStr = "NON-PERSISTENT" + } + logStr := fmt.Sprintf("SUCCESS version negotiation for (%s) (%s) peer (%v).", inboundStr, persistentStr, peer) + glog.V(1).Info(logStr) +} + +func (hm *HandshakeMetadata) NegotiatedProtocolVersion() ProtocolVersionType { + return hm.negotiatedProtocolVersion +} + +func (hm *HandshakeMetadata) GetValidatorPublicKey() bls.PublicKey { + return hm.validatorPublicKey +} + +func GetVerackHandshakePayload(nonceReceived uint64, nonceSent uint64, tstampMicro uint64) [32]byte { + // The payload for the verack message is the two nonces concatenated together. + // We do this so that we can sign the nonces and verify the signature on the other side. + nonceReceivedBytes := make([]byte, 8) + binary.BigEndian.PutUint64(nonceReceivedBytes, nonceReceived) + + nonceSentBytes := make([]byte, 8) + binary.BigEndian.PutUint64(nonceSentBytes, nonceSent) + + tstampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(tstampBytes, tstampMicro) + + payload := append(nonceReceivedBytes, nonceSentBytes...) + payload = append(payload, tstampBytes...) + + return sha3.Sum256(payload) +} diff --git a/lib/pos_remote_node_id.go b/lib/pos_remote_node_id.go new file mode 100644 index 000000000..6792fe321 --- /dev/null +++ b/lib/pos_remote_node_id.go @@ -0,0 +1,57 @@ +package lib + +const ( + RemoteNodeIdNoPeer = 0 + RemoteNodeIdNoAttempt = 0 +) + +type RemoteNodeId struct { + peerId uint64 + attemptId uint64 +} + +func (id RemoteNodeId) GetIds() (peerId uint64, attemptId uint64) { + return id.peerId, id.attemptId +} + +func NewRemoteNodeId(peerId uint64, attemptId uint64) RemoteNodeId { + return RemoteNodeId{ + peerId: peerId, + attemptId: attemptId, + } +} + +func NewRemoteNodeOutboundId(peerId uint64, attemptId uint64) RemoteNodeId { + return NewRemoteNodeId(peerId, attemptId) +} + +func NewRemoteNodeInboundId(peerId uint64) RemoteNodeId { + return RemoteNodeId{ + peerId: peerId, + attemptId: RemoteNodeIdNoAttempt, + } +} + +func NewRemoteNodeAttemptedId(attemptId uint64) RemoteNodeId { + return RemoteNodeId{ + peerId: RemoteNodeIdNoPeer, + attemptId: attemptId, + } +} + +func NewRemoteNodeNoId() RemoteNodeId { + return RemoteNodeId{ + peerId: RemoteNodeIdNoPeer, + attemptId: RemoteNodeIdNoAttempt, + } +} + +func CompareRemoteNodeId(id1 RemoteNodeId, id2 RemoteNodeId) (_equal bool) { + peerId1, attemptId1 := id1.GetIds() + peerId2, attemptId2 := id2.GetIds() + return peerId1 == peerId2 && attemptId1 == attemptId2 +} + +func EqualsRemoteNodeNoId(id RemoteNodeId) bool { + return CompareRemoteNodeId(id, NewRemoteNodeNoId()) +} diff --git a/lib/pos_remote_node_indexer.go b/lib/pos_remote_node_indexer.go new file mode 100644 index 000000000..1141f8451 --- /dev/null +++ b/lib/pos_remote_node_indexer.go @@ -0,0 +1,231 @@ +package lib + +import "github.com/deso-protocol/core/bls" + +// RNIndexId is a custom string type used for identifying different types of remote node indices. +type RNIndexId string + +// Constants for different types of remote node indices. +const ( + RNIndexId_Validator RNIndexId = "VALIDATOR" + RNIndexId_ValidatorAttempted RNIndexId = "VALIDATOR_ATTEMPTED" + RNIndexId_NonValidator_Outbound RNIndexId = "NONVALIDATOR_OUTBOUND" + RNIndexId_NonValidator_Inbound RNIndexId = "NONVALIDATOR_INBOUND" + RNIndexId_NonValidator_Attempted RNIndexId = "NONVALIDATOR_ATTEMPTED" +) + +// RemoteNodeIndexer is a structure that holds information about all remote nodes and their indices. +type RemoteNodeIndexer struct { + // AllRemoteNodes is a map storing all remote nodes by their IDs. + AllRemoteNodes map[RemoteNodeId]*RemoteNode + + // Indices for various types of remote nodes. + ValidatorIndex *RemoteNodeIndex[bls.PublicKey] + ValidatorAttemptedIndex *RemoteNodeIndex[bls.PublicKey] + NonValidatorOutboundIndex *RemoteNodeIndex[RemoteNodeId] + NonValidatorInboundIndex *RemoteNodeIndex[RemoteNodeId] + NonValidatorAttemptedIndex *RemoteNodeIndex[RemoteNodeId] + + // RemoteNodeToIndexIdList maps remote nodes to their corresponding index IDs. + RemoteNodeToIndexIdList map[*RemoteNode][]RNIndexId + + RemoteNodeToIndexRemoveFuncList map[*RemoteNode][]RemoteNodeIndexRemoveFunc +} + +// NewRemoteNodeIndexer initializes and returns a new instance of RemoteNodeIndexer. +func NewRemoteNodeIndexer() *RemoteNodeIndexer { + rni := &RemoteNodeIndexer{ + AllRemoteNodes: make(map[RemoteNodeId]*RemoteNode), + RemoteNodeToIndexIdList: make(map[*RemoteNode][]RNIndexId), + } + + // Initializing various indices with their respective types and update callback. + rni.ValidatorIndex = NewRemoteNodeIndex[bls.PublicKey](RNIndexId_Validator, rni.updateCallback) + rni.ValidatorAttemptedIndex = NewRemoteNodeIndex[bls.PublicKey](RNIndexId_ValidatorAttempted, rni.updateCallback) + rni.NonValidatorOutboundIndex = NewRemoteNodeIndex[RemoteNodeId](RNIndexId_NonValidator_Outbound, rni.updateCallback) + rni.NonValidatorInboundIndex = NewRemoteNodeIndex[RemoteNodeId](RNIndexId_NonValidator_Inbound, rni.updateCallback) + rni.NonValidatorAttemptedIndex = NewRemoteNodeIndex[RemoteNodeId](RNIndexId_NonValidator_Attempted, rni.updateCallback) + return rni +} + +// Getter methods for accessing the different indices. +func (rni *RemoteNodeIndexer) GetValidatorIndex() RemoteNodeIndexInterface[bls.PublicKey] { + return rni.ValidatorIndex +} + +func (rni *RemoteNodeIndexer) GetValidatorAttemptedIndex() RemoteNodeIndexInterface[bls.PublicKey] { + return rni.ValidatorAttemptedIndex +} + +func (rni *RemoteNodeIndexer) GetNonValidatorOutboundIndex() RemoteNodeIndexInterface[RemoteNodeId] { + return rni.NonValidatorOutboundIndex +} + +func (rni *RemoteNodeIndexer) GetNonValidatorInboundIndex() RemoteNodeIndexInterface[RemoteNodeId] { + return rni.NonValidatorInboundIndex +} + +func (rni *RemoteNodeIndexer) GetNonValidatorAttemptedIndex() RemoteNodeIndexInterface[RemoteNodeId] { + return rni.NonValidatorAttemptedIndex +} + +// Getter methods for AllRemoteNodes +func (rni *RemoteNodeIndexer) GetRemoteNodeFromPeer(peer *Peer) *RemoteNode { + if peer == nil { + return nil + } + + id := peer.ID + attemptId := peer.AttemptId() + remoteNodeId := NewRemoteNodeId(id, attemptId) + + rn, ok := rni.AllRemoteNodes[remoteNodeId] + if !ok { + return nil + } + return rn +} + +func (rni *RemoteNodeIndexer) SetRemoteNode(rn *RemoteNode) { + if rn == nil || EqualsRemoteNodeNoId(rn.GetId()) { + return + } + + rni.AllRemoteNodes[rn.GetId()] = rn +} + +func (rni *RemoteNodeIndexer) RemoveRemoteNode(rn *RemoteNode) { + if rn == nil || EqualsRemoteNodeNoId(rn.GetId()) { + return + } + + delete(rni.AllRemoteNodes, rn.GetId()) + if _, ok := rni.RemoteNodeToIndexRemoveFuncList[rn]; !ok { + return + } + + for _, removeFunc := range rni.RemoteNodeToIndexRemoveFuncList[rn] { + if removeFunc == nil { + continue + } + removeFunc() + } +} + +// updateCallback is invoked when a node is added or removed from an index. +func (rni *RemoteNodeIndexer) updateCallback(id RNIndexId, node *RemoteNode, isAdd bool, removeFunc RemoteNodeIndexRemoveFunc) { + if isAdd { + rni.RemoteNodeToIndexIdList[node] = append(rni.RemoteNodeToIndexIdList[node], id) + rni.RemoteNodeToIndexRemoveFuncList[node] = append(rni.RemoteNodeToIndexRemoveFuncList[node], removeFunc) + } else { + var indexId RNIndexId + pos := 0 + + for pos, indexId = range rni.RemoteNodeToIndexIdList[node] { + if indexId == id { + rni.RemoteNodeToIndexIdList[node] = append(rni.RemoteNodeToIndexIdList[node][:pos], rni.RemoteNodeToIndexIdList[node][pos+1:]...) + break + } + } + + rni.RemoteNodeToIndexRemoveFuncList[node] = append(rni.RemoteNodeToIndexRemoveFuncList[node][:pos], rni.RemoteNodeToIndexRemoveFuncList[node][pos+1:]...) + } +} + +// RemoteNodeIndexInterface defines the methods for a remote node index. +type RemoteNodeIndexInterface[Key comparable] interface { + GetId() RNIndexId + Add(key Key, node *RemoteNode) + Remove(key Key) + Get(key Key) (*RemoteNode, bool) + GetRandom() (*RemoteNode, bool) + GetIndex() map[Key]*RemoteNode + GetAll() []*RemoteNode +} + +// RemoteNodeIndex holds an index of remote nodes by a specific key type. +type RemoteNodeIndex[Key comparable] struct { + Id RNIndexId + Index map[Key]*RemoteNode + updateCallback RemoteNodeIndexCallback +} + +// RemoteNodeIndexRemoveFunc is a function type for removal of a RemoteNode from the Index. +type RemoteNodeIndexRemoveFunc func() + +// RemoteNodeIndexCallback is a function type for update callbacks. +type RemoteNodeIndexCallback func(id RNIndexId, node *RemoteNode, isAdd bool, removeFunc RemoteNodeIndexRemoveFunc) + +// NewRemoteNodeIndex creates and returns a new RemoteNodeIndex. +func NewRemoteNodeIndex[Key comparable](id RNIndexId, updateCallback RemoteNodeIndexCallback) *RemoteNodeIndex[Key] { + return &RemoteNodeIndex[Key]{ + Id: id, + Index: make(map[Key]*RemoteNode), + updateCallback: updateCallback, + } +} + +// Implementations of the RemoteNodeIndexInterface methods. +func (rni *RemoteNodeIndex[Key]) GetId() RNIndexId { + return rni.Id +} + +func (rni *RemoteNodeIndex[Key]) Add(key Key, node *RemoteNode) { + rni.Index[key] = node + + // Define the remove function + removeFunc := func() { + if _, ok := rni.Index[key]; !ok { + return + } + delete(rni.Index, key) + } + + if rni.updateCallback != nil { + rni.updateCallback(rni.Id, node, true, removeFunc) + } +} + +func (rni *RemoteNodeIndex[Key]) Remove(key Key) { + rn, ok := rni.Index[key] + if !ok { + return + } + delete(rni.Index, key) + if rni.updateCallback != nil { + rni.updateCallback(rni.Id, rn, false, nil) + } +} + +func (rni *RemoteNodeIndex[Key]) Get(key Key) (*RemoteNode, bool) { + elem, ok := rni.Index[key] + return elem, ok +} + +func (rni *RemoteNodeIndex[Key]) GetRandom() (*RemoteNode, bool) { + if len(rni.Index) == 0 { + return nil, false + } + + var node *RemoteNode + for _, node = range rni.Index { + break + } + return node, true +} + +func (rni *RemoteNodeIndex[Key]) GetIndex() map[Key]*RemoteNode { + index := make(map[Key]*RemoteNode) + for key, node := range rni.Index { + index[key] = node + } + return index +} + +func (rni *RemoteNodeIndex[Key]) GetAll() []*RemoteNode { + var nodes []*RemoteNode + for _, node := range rni.Index { + nodes = append(nodes, node) + } + return nodes +} diff --git a/lib/pos_remote_node_indexer_manager.go b/lib/pos_remote_node_indexer_manager.go new file mode 100644 index 000000000..0ac7c8295 --- /dev/null +++ b/lib/pos_remote_node_indexer_manager.go @@ -0,0 +1,165 @@ +package lib + +import ( + "github.com/btcsuite/btcd/wire" + "github.com/deso-protocol/core/bls" +) + +type RemoteNodeIndexerManager struct { + remoteNodeIndexer *RemoteNodeIndexer +} + +func NewRemoteNodeIndexerManager() *RemoteNodeIndexerManager { + return &RemoteNodeIndexerManager{ + remoteNodeIndexer: NewRemoteNodeIndexer(), + } +} + +func (manager *RemoteNodeIndexerManager) GetRemoteNodeIndexer() *RemoteNodeIndexer { + return manager.remoteNodeIndexer +} + +func (manager *RemoteNodeIndexerManager) GetRemoteNodeFromPeer(peer *Peer) *RemoteNode { + return manager.GetRemoteNodeIndexer().GetRemoteNodeFromPeer(peer) +} + +func (manager *RemoteNodeIndexerManager) DisconnectPeer(peer *Peer) { + rn := manager.GetRemoteNodeFromPeer(peer) + if rn == nil { + return + } + + manager.Disconnect(rn) +} + +func (manager *RemoteNodeIndexerManager) Disconnect(rn *RemoteNode) { + rn.Disconnect() + manager.GetRemoteNodeIndexer().RemoveRemoteNode(rn) +} + +func (manager *RemoteNodeIndexerManager) RemovePeer(peer *Peer) { + rn := manager.GetRemoteNodeFromPeer(peer) + manager.GetRemoteNodeIndexer().RemoveRemoteNode(rn) +} + +func (manager *RemoteNodeIndexerManager) SendMessageToPeer(peer *Peer, desoMessage DeSoMessage) { + rn := manager.GetRemoteNodeFromPeer(peer) + if rn == nil { + return + } + + rn.SendMessage(desoMessage) +} + +func (manager *RemoteNodeIndexerManager) CreateValidatorConnection(netAddr *wire.NetAddress, publicKey bls.PublicKey) { + if netAddr == nil { + return + } + + remoteNode := NewRemoteNode() + remoteNode.CreatePersistentOutboundConnection(netAddr) + manager.GetRemoteNodeIndexer().SetRemoteNode(remoteNode) + manager.GetRemoteNodeIndexer().GetValidatorAttemptedIndex().Add(publicKey, remoteNode) +} + +func (manager *RemoteNodeIndexerManager) CreatePersistentOutboundConnectionNetAddress(netAddr *wire.NetAddress) { + if netAddr == nil { + return + } + + remoteNode := NewRemoteNode() + remoteNode.CreatePersistentOutboundConnection(netAddr) + manager.GetRemoteNodeIndexer().SetRemoteNode(remoteNode) + manager.GetRemoteNodeIndexer().GetNonValidatorAttemptedIndex().Add(remoteNode.GetId(), remoteNode) +} + +func (manager *RemoteNodeIndexerManager) CreateOutboundConnectionNetAddress(netAddr *wire.NetAddress) { + if netAddr == nil { + return + } + + remoteNode := NewRemoteNode() + remoteNode.CreateOutboundConnection(netAddr) + manager.GetRemoteNodeIndexer().SetRemoteNode(remoteNode) + manager.GetRemoteNodeIndexer().GetNonValidatorAttemptedIndex().Add(remoteNode.GetId(), remoteNode) +} + +func (manager *RemoteNodeIndexerManager) AddRemoteNode(remoteNode *RemoteNode) { + manager.GetRemoteNodeIndexer().SetRemoteNode(remoteNode) +} + +func (manager *RemoteNodeIndexerManager) SetValidator(pk bls.PublicKey, rn *RemoteNode) { + if rn == nil { + return + } + manager.GetRemoteNodeIndexer().GetValidatorIndex().Add(pk, rn) + + if rn.IsOutbound() { + manager.GetRemoteNodeIndexer().GetNonValidatorOutboundIndex().Remove(rn.GetId()) + } else if rn.IsInbound() { + manager.GetRemoteNodeIndexer().GetNonValidatorInboundIndex().Remove(rn.GetId()) + } else { + manager.Disconnect(rn) + } +} + +func (manager *RemoteNodeIndexerManager) UnsetValidator(pk bls.PublicKey, rn *RemoteNode) { + if rn == nil { + return + } + manager.GetRemoteNodeIndexer().GetValidatorIndex().Remove(pk) + + if rn.IsOutbound() { + manager.SetNonValidatorOutbound(rn) + } else if rn.IsInbound() { + manager.SetNonValidatorInbound(rn) + } else { + manager.Disconnect(rn) + } +} + +func (manager *RemoteNodeIndexerManager) SetNonValidatorOutbound(rn *RemoteNode) { + if rn == nil || !rn.IsOutbound() { + return + } + + manager.RemoveNonValidatorAttempted(rn.GetId()) + manager.GetRemoteNodeIndexer().GetNonValidatorOutboundIndex().Add(rn.GetId(), rn) +} + +func (manager *RemoteNodeIndexerManager) SetNonValidatorInbound(rn *RemoteNode) { + if rn == nil || !rn.IsInbound() { + return + } + + manager.GetRemoteNodeIndexer().GetNonValidatorInboundIndex().Add(rn.GetId(), rn) +} + +func (manager *RemoteNodeIndexerManager) RemoveNonValidatorAttempted(id RemoteNodeId) { + rn, ok := manager.GetRemoteNodeIndexer().GetNonValidatorAttemptedIndex().Get(id) + if !ok { + return + } + manager.GetRemoteNodeIndexer().RemoveRemoteNode(rn) +} + +func (manager *RemoteNodeIndexerManager) GetAllNonValidators() []*RemoteNode { + outboundRemoteNodes := manager.GetRemoteNodeIndexer().GetNonValidatorOutboundIndex().GetAll() + inboundRemoteNodes := manager.GetRemoteNodeIndexer().GetNonValidatorInboundIndex().GetAll() + return append(outboundRemoteNodes, inboundRemoteNodes...) +} + +func (manager *RemoteNodeIndexerManager) GetNumConnectedOutboundPeers() uint32 { + outboundPeers := manager.GetRemoteNodeIndexer().GetNonValidatorOutboundIndex().GetAll() + return uint32(len(outboundPeers)) +} + +func (manager *RemoteNodeIndexerManager) GetNumConnectedInboundPeers() uint32 { + inboundPeers := manager.GetRemoteNodeIndexer().GetNonValidatorInboundIndex().GetAll() + return uint32(len(inboundPeers)) +} + +func (manager *RemoteNodeIndexerManager) GetNumAttemptedNonValidators() uint32 { + attemptedNonValidators := manager.GetRemoteNodeIndexer().GetNonValidatorAttemptedIndex().GetAll() + return uint32(len(attemptedNonValidators)) +} diff --git a/lib/server.go b/lib/server.go index 52fbbd195..14f05ed44 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/hex" "fmt" + "github.com/btcsuite/btcd/wire" + "github.com/deso-protocol/core/consensus" "net" "reflect" "runtime" @@ -17,9 +19,7 @@ import ( "github.com/btcsuite/btcd/addrmgr" chainlib "github.com/btcsuite/btcd/blockchain" - "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" - "github.com/deso-protocol/core/consensus" "github.com/deso-protocol/go-deadlock" "github.com/dgraph-io/badger/v3" "github.com/golang/glog" @@ -61,8 +61,9 @@ type Server struct { eventManager *EventManager TxIndex *TXIndex - handshakeController *HandshakeController - consensusController *ConsensusController + handshakeController *HandshakeController + consensusController *ConsensusController + connectionController *ConnectionController // posMempool *PosMemPool TODO: Add the mempool later // All messages received from peers get sent from the ConnectionManager to the @@ -120,15 +121,6 @@ type Server struct { // requested data but have not yet received a response. requestedTransactionsMap map[BlockHash]*GetDataRequestInfo - // addrsToBroadcast is a list of all the addresses we've received from valid addr - // messages that we intend to broadcast to our peers. It is organized as: - // -> . - // - // It is organized in this way so that we can limit the number of addresses we - // are distributing for a single peer to avoid a DOS attack. - addrsToBroadcastLock deadlock.RWMutex - addrsToBroadcastt map[string][]*SingleAddr - // When set to true, we disable the ConnectionManager DisableNetworking bool @@ -196,6 +188,10 @@ func (srv *Server) GetNumOutboundPeers() uint32 { return srv.cmgr.GetNumOutboundPeers() } +func (srv *Server) IsFromRedundantInboundIPAddress(netAddr *wire.NetAddress) bool { + return srv.cmgr._isFromRedundantInboundIPAddress(netAddr) +} + func (srv *Server) IsFromRedundantOutboundIPAddress(netAddr *wire.NetAddress) bool { return srv.cmgr.isRedundantGroupKey(netAddr) } @@ -212,6 +208,10 @@ func (srv *Server) RemoveAttemptedOutboundAddrs(netAddr *wire.NetAddress) { srv.cmgr.RemoveAttemptedOutboundAddrs(netAddr) } +func (srv *Server) ConnectPeer(conn net.Conn, na *wire.NetAddress, attemptId uint64, isOutbound bool, isPersistent bool) *Peer { + return srv.cmgr.ConnectPeer(conn, na, attemptId, isOutbound, isPersistent) +} + // dataLock must be acquired for writing before calling this function. func (srv *Server) _expireRequests() { // TODO: It could in theory get slow to do brute force iteration over everything @@ -252,7 +252,6 @@ func (srv *Server) GetBlockProducer() *DeSoBlockProducer { return srv.blockProducer } -// TODO: The hallmark of a messy non-law-of-demeter-following interface... func (srv *Server) GetConnectionManager() *ConnectionManager { return srv.cmgr } @@ -462,8 +461,7 @@ func NewServer( // Create a new connection manager but note that it won't be initialized until Start(). _incomingMessages := make(chan *ServerMessage, (_targetOutboundPeers+_maxInboundPeers)*3) _cmgr := NewConnectionManager( - _params, _desoAddrMgr, _listeners, _connectIps, timesource, - _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP, + _params, _listeners, _connectIps, timesource, _hyperSync, _syncType, _stallTimeoutSeconds, _minFeeRateNanosPerKB, _incomingMessages, srv) @@ -498,9 +496,12 @@ func NewServer( hex.EncodeToString(_chain.blockTip().Hash[:]), hex.EncodeToString(BigintToHash(_chain.blockTip().CumWork)[:])) - srv.consensusController = NewConsensusController(_chain, srv) - srv.handshakeController = NewHandshakeController(_chain, srv, _params, _minFeeRateNanosPerKB, - _hyperSync, _blsKeystore) + srv.consensusController = NewConsensusController(_params, _chain, nil, nil) + rniManager := NewRemoteNodeIndexerManager() + + srv.connectionController = NewConnectionController(_params, srv, rniManager, _blsKeystore.GetSigner(), _desoAddrMgr, + _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) + srv.handshakeController = NewHandshakeController(rniManager) if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) @@ -588,9 +589,6 @@ func NewServer( srv.StartStatsdReporter() } - // Initialize the addrs to broadcast map. - srv.addrsToBroadcastt = make(map[string][]*SingleAddr) - // This will initialize the request queues. srv.ResetRequestQueues() @@ -2171,94 +2169,19 @@ func (srv *Server) StartStatsdReporter() { }() } -func (srv *Server) _handleAddrMessage(pp *Peer, msg *MsgDeSoAddr) { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - glog.V(1).Infof("Server._handleAddrMessage: Received Addr from peer %v with addrs %v", pp, spew.Sdump(msg.AddrList)) - - // If this addr message contains more than the maximum allowed number of addresses - // then disconnect this peer. - if len(msg.AddrList) > MaxAddrsPerAddrMsg { - glog.Errorf(fmt.Sprintf("Server._handleAddrMessage: Disconnecting "+ - "Peer %v for sending us an addr message with %d transactions, which exceeds "+ - "the max allowed %d", - pp, len(msg.AddrList), MaxAddrsPerAddrMsg)) - pp.Disconnect() - return - } - - // Add all the addresses we received to the addrmgr. - netAddrsReceived := []*wire.NetAddress{} - for _, addr := range msg.AddrList { - addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) - if !addrmgr.IsRoutable(addrAsNetAddr) { - glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, pp) - continue - } - - netAddrsReceived = append( - netAddrsReceived, addrAsNetAddr) - } - srv.cmgr.AddrMgr.AddAddresses(netAddrsReceived, pp.netAddr) - - // If the message had <= 10 addrs in it, then queue all the addresses for relaying - // on the next cycle. - if len(msg.AddrList) <= 10 { - glog.V(1).Infof("Server._handleAddrMessage: Queueing %d addrs for forwarding from "+ - "peer %v", len(msg.AddrList), pp) - sourceAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: pp.netAddr.IP, - Port: pp.netAddr.Port, - Services: pp.serviceFlags, - } - listToAddTo, hasSeenSource := srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] - if !hasSeenSource { - listToAddTo = []*SingleAddr{} - } - // If this peer has been sending us a lot of little crap, evict a lot of their - // stuff but don't disconnect. - if len(listToAddTo) > MaxAddrsPerAddrMsg { - listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] - } - listToAddTo = append(listToAddTo, msg.AddrList...) - srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo - } -} - -func (srv *Server) _handleGetAddrMessage(pp *Peer, msg *MsgDeSoGetAddr) { - glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", pp) - // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr - // and send them back to the peer. - netAddrsFound := srv.cmgr.AddrMgr.AddressCache() - if len(netAddrsFound) > MaxAddrsPerAddrMsg { - netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] - } - - // Convert the list to a SingleAddr list. - res := &MsgDeSoAddr{} - for _, netAddr := range netAddrsFound { - singleAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: netAddr.IP, - Port: netAddr.Port, - Services: (ServiceFlag)(netAddr.Services), - } - res.AddrList = append(res.AddrList, singleAddr) - } - pp.AddDeSoMessage(res, false) -} - func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_shouldQuit bool) { switch serverMessage.Msg.(type) { // Control messages used internally to signal to the server. case *MsgDeSoNewPeer: - srv.handshakeController._handleNewPeerMessage(serverMessage.Peer, serverMessage.Msg) + break case *MsgDeSoHandshakePeer: srv._handleHandshakePeer(serverMessage.Peer) + srv.handshakeController._handleHandshakePeerMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoDonePeer: srv._handleDonePeer(serverMessage.Peer) + srv.connectionController._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoNewConnection: + srv.connectionController._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoQuit: return true } @@ -2270,6 +2193,10 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) { // Handle all non-control message types from our Peers. switch msg := serverMessage.Msg.(type) { // Messages sent among peers. + case *MsgDeSoAddr: + srv.connectionController._handleAddrMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoGetAddr: + srv.connectionController._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoGetHeaders: srv._handleGetHeaders(serverMessage.Peer, msg) case *MsgDeSoHeaderBundle: @@ -2356,20 +2283,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Infof("Server._startConsensus: Handling message of type %v from Peer %v", serverMessage.Msg.GetMsgType(), serverMessage.Peer) - - // If the message is an addr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeAddr { - srv._handleAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoAddr)) - continue - } - // If the message is a GetAddr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeGetAddr { - srv._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoGetAddr)) - continue - } - srv._handlePeerMessages(serverMessage) // Always check for and handle control messages regardless of whether the @@ -2390,96 +2303,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Info("Server.Start: Server done") } -func (srv *Server) _getAddrsToBroadcast() []*SingleAddr { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - // If there's nothing in the map, return. - if len(srv.addrsToBroadcastt) == 0 { - return []*SingleAddr{} - } - - // If we get here then we have some addresses to broadcast. - addrsToBroadcast := []*SingleAddr{} - for len(addrsToBroadcast) < 10 && len(srv.addrsToBroadcastt) > 0 { - // Choose a key at random. This works because map iteration is random in golang. - bucket := "" - for kk := range srv.addrsToBroadcastt { - bucket = kk - break - } - - // Remove the last element from the slice for the given bucket. - currentAddrList := srv.addrsToBroadcastt[bucket] - if len(currentAddrList) > 0 { - lastIndex := len(currentAddrList) - 1 - currentAddr := currentAddrList[lastIndex] - currentAddrList = currentAddrList[:lastIndex] - if len(currentAddrList) == 0 { - delete(srv.addrsToBroadcastt, bucket) - } else { - srv.addrsToBroadcastt[bucket] = currentAddrList - } - - addrsToBroadcast = append(addrsToBroadcast, currentAddr) - } - } - - return addrsToBroadcast -} - -// Must be run inside a goroutine. Relays addresses to peers at regular intervals -// and relays our own address to peers once every 24 hours. -func (srv *Server) _startAddressRelayer() { - for numMinutesPassed := 0; ; numMinutesPassed++ { - if atomic.LoadInt32(&srv.shutdown) >= 1 { - break - } - // For the first ten minutes after the server starts, relay our address to all - // peers. After the first ten minutes, do it once every 24 hours. - glog.V(1).Infof("Server.Start._startAddressRelayer: Relaying our own addr to peers") - if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { - for _, pp := range srv.cmgr.GetAllPeers() { - bestAddress := srv.cmgr.AddrMgr.GetBestLocalAddress(pp.netAddr) - if bestAddress != nil { - glog.V(2).Infof("Server.Start._startAddressRelayer: Relaying address %v to "+ - "peer %v", bestAddress.IP.String(), pp) - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: []*SingleAddr{ - { - Timestamp: time.Now(), - IP: bestAddress.IP, - Port: bestAddress.Port, - Services: (ServiceFlag)(bestAddress.Services), - }, - }, - }, false) - } - } - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Seeing if there are addrs to relay...") - // Broadcast the addrs we have to all of our peers. - addrsToBroadcast := srv._getAddrsToBroadcast() - if len(addrsToBroadcast) == 0 { - glog.V(2).Infof("Server.Start._startAddressRelayer: No addrs to relay.") - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Found %d addrs to "+ - "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) - // Iterate over all our peers and broadcast the addrs to all of them. - for _, pp := range srv.cmgr.GetAllPeers() { - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: addrsToBroadcast, - }, false) - } - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } -} - func (srv *Server) _startTransactionRelayer() { // If we've set a maximum sync height, we will not relay transactions. if srv.blockchain.MaxSyncBlockHeight > 0 { @@ -2590,8 +2413,6 @@ func (srv *Server) Start() { go srv._startConsensus() - go srv._startAddressRelayer() - go srv._startTransactionRelayer() // Once the ConnectionManager is started, peers will be found and connected to and