diff --git a/integration_testing/blocksync_test.go b/integration_testing/blocksync_test.go index 8be96d735..d55124c13 100644 --- a/integration_testing/blocksync_test.go +++ b/integration_testing/blocksync_test.go @@ -40,9 +40,7 @@ func TestSimpleBlockSync(t *testing.T) { // wait for node1 to sync blocks waitForNodeToFullySync(node1) - // bridge the nodes together. - bridge := NewConnectionBridge(node1, node2) - require.NoError(bridge.Start()) + // TODO: Dial an outbound connection from node2 to node1 // wait for node2 to sync blocks. waitForNodeToFullySync(node2) @@ -99,6 +97,7 @@ func TestSimpleSyncRestart(t *testing.T) { compareNodesByDB(t, node1, node2, 0) fmt.Println("Random restart successful! Random height was", randomHeight) fmt.Println("Databases match!") + bridge.Disconnect() node1.Stop() node2.Stop() } @@ -153,7 +152,7 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) { randomHeight := randomUint32Between(t, 10, config2.MaxSyncBlockHeight) fmt.Println("Random height for a restart (re-use if test failed):", randomHeight) - disconnectAtBlockHeight(t, node2, bridge12, randomHeight) + disconnectAtBlockHeight(node2, bridge12, randomHeight) // bridge the nodes together. bridge23 := NewConnectionBridge(node2, node3) @@ -167,6 +166,8 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) { compareNodesByDB(t, node3, node2, 0) fmt.Println("Random restart successful! Random height was", randomHeight) fmt.Println("Databases match!") + bridge12.Disconnect() + bridge23.Disconnect() node1.Stop() node2.Stop() node3.Stop() diff --git a/integration_testing/connection_bridge.go b/integration_testing/connection_bridge.go index 4c3b28dde..33412dd72 100644 --- a/integration_testing/connection_bridge.go +++ b/integration_testing/connection_bridge.go @@ -111,13 +111,14 @@ func (bridge *ConnectionBridge) createInboundConnection(node *cmd.Node) *lib.Pee } // This channel is redundant in our setting. - messagesFromPeer := make(chan *lib.ServerMessage) + messagesFromPeer := make(chan *lib.ServerMessage, 100) + newPeerChan := make(chan *lib.Peer, 100) + donePeerChan := make(chan *lib.Peer, 100) // Because it is an inbound Peer of the node, it is simultaneously a "fake" outbound Peer of the bridge. // Hence, we will mark the _isOutbound parameter as "true" in NewPeer. - peer := lib.NewPeer(conn, true, netAddress, true, - 10000, 0, &lib.DeSoMainnetParams, - messagesFromPeer, nil, nil, lib.NodeSyncTypeAny) - peer.ID = uint64(lib.RandInt64(math.MaxInt64)) + peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, true, + netAddress, true, 10000, 0, &lib.DeSoMainnetParams, + messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan) return peer } @@ -139,27 +140,30 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo fmt.Println("createOutboundConnection: Got a connection from remote:", conn.RemoteAddr().String(), "on listener:", ll.Addr().String()) - na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), otherNode.Server.GetConnectionManager().AddrMgr, - otherNode.Params) - messagesFromPeer := make(chan *lib.ServerMessage) - peer := lib.NewPeer(conn, false, na, false, - 10000, 0, bridge.nodeB.Params, - messagesFromPeer, nil, nil, lib.NodeSyncTypeAny) - peer.ID = uint64(lib.RandInt64(math.MaxInt64)) + addrMgr := addrmgr.New("", net.LookupIP) + na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), addrMgr, otherNode.Params) + messagesFromPeer := make(chan *lib.ServerMessage, 100) + newPeerChan := make(chan *lib.Peer, 100) + donePeerChan := make(chan *lib.Peer, 100) + peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, + false, na, false, 10000, 0, bridge.nodeB.Params, + messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan) bridge.newPeerChan <- peer //} }(ll) // Make the provided node to make an outbound connection to our listener. - netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams) - fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port) - go node.Server.GetConnectionManager().ConnectPeer(nil, netAddress) + //netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams) + //fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port) + addrMgr := addrmgr.New("", net.LookupIP) + addr, _ := lib.IPToNetAddr(ll.Addr().String(), addrMgr, node.Params) + go node.Server.GetConnectionManager().DialOutboundConnection(addr, uint64(lib.RandInt64(math.MaxInt64))) } // getVersionMessage simulates a version message that the provided node would have sent. func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVersion { ver := lib.NewMessage(lib.MsgTypeVersion).(*lib.MsgDeSoVersion) - ver.Version = node.Params.ProtocolVersion + ver.Version = node.Params.ProtocolVersion.ToUint64() ver.TstampSecs = time.Now().Unix() ver.Nonce = uint64(lib.RandInt64(math.MaxInt64)) ver.UserAgent = node.Params.UserAgent @@ -172,12 +176,29 @@ func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVe } if node.Server != nil { - ver.LatestBlockHeight = uint32(node.Server.GetBlockchain().BlockTip().Header.Height) + ver.LatestBlockHeight = node.Server.GetBlockchain().BlockTip().Header.Height } ver.MinFeeRateNanosPerKB = node.Config.MinFeerate return ver } +func ReadWithTimeout(readFunc func() error, readTimeout time.Duration) error { + errChan := make(chan error) + go func() { + errChan <- readFunc() + }() + select { + case err := <-errChan: + { + return err + } + case <-time.After(readTimeout): + { + return fmt.Errorf("ReadWithTimeout: Timed out reading message") + } + } +} + // startConnection starts the connection by performing version and verack exchange with // the provided connection, pretending to be the otherNode. func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode *cmd.Node) error { @@ -192,7 +213,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode } // Wait for a response to the version message. - if err := connection.ReadWithTimeout( + if err := ReadWithTimeout( func() error { msg, err := connection.ReadDeSoMessage() if err != nil { @@ -215,7 +236,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode // Now prepare the verack message. verackMsg := lib.NewMessage(lib.MsgTypeVerack) - verackMsg.(*lib.MsgDeSoVerack).Nonce = connection.VersionNonceReceived + verackMsg.(*lib.MsgDeSoVerack).NonceReceived = connection.VersionNonceReceived // And send it to the connection. if err := connection.WriteDeSoMessage(verackMsg); err != nil { @@ -223,7 +244,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode } // And finally wait for connection's response to the verack message. - if err := connection.ReadWithTimeout( + if err := ReadWithTimeout( func() error { msg, err := connection.ReadDeSoMessage() if err != nil { @@ -234,9 +255,9 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode return fmt.Errorf("message is not verack! Type: %v", msg.GetMsgType()) } verackMsg := msg.(*lib.MsgDeSoVerack) - if verackMsg.Nonce != connection.VersionNonceSent { + if verackMsg.NonceReceived != connection.VersionNonceSent { return fmt.Errorf("verack message nonce doesn't match (received: %v, sent: %v)", - verackMsg.Nonce, connection.VersionNonceSent) + verackMsg.NonceReceived, connection.VersionNonceSent) } return nil }, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil { diff --git a/integration_testing/hypersync_test.go b/integration_testing/hypersync_test.go index aad90ee0e..bc4c8a7c0 100644 --- a/integration_testing/hypersync_test.go +++ b/integration_testing/hypersync_test.go @@ -53,6 +53,7 @@ func TestSimpleHyperSync(t *testing.T) { //compareNodesByDB(t, node1, node2, 0) compareNodesByChecksum(t, node1, node2) fmt.Println("Databases match!") + bridge.Disconnect() node1.Stop() node2.Stop() } @@ -122,6 +123,8 @@ func TestHyperSyncFromHyperSyncedNode(t *testing.T) { compareNodesByChecksum(t, node2, node3) fmt.Println("Databases match!") + bridge12.Disconnect() + bridge23.Disconnect() node1.Stop() node2.Stop() node3.Stop() @@ -178,6 +181,7 @@ func TestSimpleHyperSyncRestart(t *testing.T) { compareNodesByChecksum(t, node1, node2) fmt.Println("Random restart successful! Random sync prefix was", syncPrefix) fmt.Println("Databases match!") + bridge.Disconnect() node1.Stop() node2.Stop() } @@ -255,6 +259,8 @@ func TestSimpleHyperSyncDisconnectWithSwitchingToNewPeer(t *testing.T) { compareNodesByChecksum(t, node1, node2) fmt.Println("Random restart successful! Random sync prefix was", syncPrefix) fmt.Println("Databases match!") + bridge12.Disconnect() + bridge23.Disconnect() node1.Stop() node2.Stop() node3.Stop() @@ -349,6 +355,7 @@ func TestArchivalMode(t *testing.T) { //compareNodesByDB(t, node1, node2, 0) compareNodesByChecksum(t, node1, node2) fmt.Println("Databases match!") + bridge.Disconnect() node1.Stop() node2.Stop() } @@ -406,6 +413,9 @@ func TestBlockSyncFromArchivalModeHyperSync(t *testing.T) { //compareNodesByDB(t, node1, node2, 0) compareNodesByChecksum(t, node1, node2) fmt.Println("Databases match!") + bridge12.Disconnect() + bridge23.Disconnect() node1.Stop() node2.Stop() + node3.Stop() } diff --git a/integration_testing/migrations_test.go b/integration_testing/migrations_test.go index b0a692b52..1419d483e 100644 --- a/integration_testing/migrations_test.go +++ b/integration_testing/migrations_test.go @@ -59,6 +59,7 @@ func TestEncoderMigrations(t *testing.T) { compareNodesByChecksum(t, node1, node2) fmt.Println("Databases match!") + bridge.Disconnect() node1.Stop() node2.Stop() } diff --git a/integration_testing/mining_test.go b/integration_testing/mining_test.go index 49a23333c..88de5e097 100644 --- a/integration_testing/mining_test.go +++ b/integration_testing/mining_test.go @@ -29,9 +29,7 @@ func TestRegtestMiner(t *testing.T) { // wait for node1 to sync blocks mineHeight := uint32(40) - listener := make(chan bool) - listenForBlockHeight(t, node1, mineHeight, listener) - <-listener + <-listenForBlockHeight(node1, mineHeight) node1.Stop() } diff --git a/integration_testing/networking_test.go b/integration_testing/networking_test.go new file mode 100644 index 000000000..bbe9063ee --- /dev/null +++ b/integration_testing/networking_test.go @@ -0,0 +1,38 @@ +package integration_testing + +import ( + "github.com/deso-protocol/core/cmd" + "github.com/deso-protocol/core/lib" + "github.com/stretchr/testify/require" + "os" + "testing" + "time" +) + +func TestSimpleConnectDisconnect(t *testing.T) { + require := require.New(t) + _ = require + + dbDir1 := getDirectory(t) + defer os.RemoveAll(dbDir1) + + config1 := generateConfig(t, 18000, dbDir1, 10) + config1.SyncType = lib.NodeSyncTypeBlockSync + config1.MaxSyncBlockHeight = 100 + node1 := cmd.NewNode(config1) + node1 = startNode(t, node1) + + // TODO: connect node1 to deso-seed-2.io + node1.Server.GetConnectionController().SetTargetOutboundPeers(0) + + <-listenForBlockHeight(node1, 50) + // TODO: close connection + time.Sleep(3 * time.Second) + peers := node1.Server.GetConnectionManager().GetAllPeers() + for _, peer := range peers { + if peer.ID == 1 { + t.Fatalf("Should have disconnected from peer 1") + } + } + node1.Stop() +} diff --git a/integration_testing/remote_node_test.go b/integration_testing/remote_node_test.go new file mode 100644 index 000000000..a869ee88d --- /dev/null +++ b/integration_testing/remote_node_test.go @@ -0,0 +1,234 @@ +package integration_testing + +import ( + "fmt" + "github.com/btcsuite/btcd/addrmgr" + "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/core/cmd" + "github.com/deso-protocol/core/lib" + "github.com/stretchr/testify/require" + "net" + "os" + "testing" +) + +func TestOutboundConnectionNonValidators(t *testing.T) { + require := require.New(t) + _ = require + + dbDir1 := getDirectory(t) + dbDir2 := getDirectory(t) + defer os.RemoveAll(dbDir1) + defer os.RemoveAll(dbDir2) + + config1 := generateConfig(t, 18000, dbDir1, 10) + config1.SyncType = lib.NodeSyncTypeBlockSync + config2 := generateConfig(t, 18001, dbDir2, 10) + config2.SyncType = lib.NodeSyncTypeBlockSync + + node1 := cmd.NewNode(config1) + node2 := cmd.NewNode(config2) + node1.Params.UserAgent = "Node1" + node2.Params.UserAgent = "Node2" + + node1 = startNode(t, node1) + node2 = startNode(t, node2) + defer node1.Stop() + defer node2.Stop() + + addrMgr := addrmgr.New("", net.LookupIP) + addr, err := lib.IPToNetAddr(node2.Listeners[0].Addr().String(), addrMgr, node1.Params) + require.NoError(err) + rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() + require.NoError(rnManagerN1.CreateNonValidatorOutboundConnection(addr)) + waitForNonValidatorOutboundConnection(t, node1, node2) + waitForNonValidatorInboundConnection(t, node2, node1) +} + +func TestOutboundConnectionValidators(t *testing.T) { + require := require.New(t) + _ = require + + dbDir1 := getDirectory(t) + dbDir2 := getDirectory(t) + defer os.RemoveAll(dbDir1) + defer os.RemoveAll(dbDir2) + + config1 := generateConfig(t, 18000, dbDir1, 10) + config1.SyncType = lib.NodeSyncTypeBlockSync + blsPriv1, err := bls.NewPrivateKey() + require.NoError(err) + config1.PosValidatorSeed = blsPriv1.ToString() + config2 := generateConfig(t, 18001, dbDir2, 10) + config2.SyncType = lib.NodeSyncTypeBlockSync + blsPriv2, err := bls.NewPrivateKey() + blsPub2 := blsPriv2.PublicKey() + require.NoError(err) + config2.PosValidatorSeed = blsPriv2.ToString() + + node1 := cmd.NewNode(config1) + node2 := cmd.NewNode(config2) + node1.Params.UserAgent = "Node1" + node1.Params.ProtocolVersion = lib.ProtocolVersion2 + node2.Params.UserAgent = "Node2" + node2.Params.ProtocolVersion = lib.ProtocolVersion2 + + node1 = startNode(t, node1) + node2 = startNode(t, node2) + defer node1.Stop() + defer node2.Stop() + + addrMgr := addrmgr.New("", net.LookupIP) + addr, err := lib.IPToNetAddr(node2.Listeners[0].Addr().String(), addrMgr, node1.Params) + require.NoError(err) + rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() + require.NoError(rnManagerN1.CreateValidatorConnection(addr, blsPub2)) + waitForValidatorConnection(t, node1, node2) + waitForValidatorConnection(t, node2, node1) +} + +func TestOutboundConnectionValidatorsAndNonValidators(t *testing.T) { + require := require.New(t) + _ = require + + dbDir1 := getDirectory(t) + dbDir2 := getDirectory(t) + defer os.RemoveAll(dbDir1) + defer os.RemoveAll(dbDir2) + + config1 := generateConfig(t, 18000, dbDir1, 10) + config1.SyncType = lib.NodeSyncTypeBlockSync + blsPriv1, err := bls.NewPrivateKey() + require.NoError(err) + config1.PosValidatorSeed = blsPriv1.ToString() + config2 := generateConfig(t, 18001, dbDir2, 10) + config2.SyncType = lib.NodeSyncTypeBlockSync + + node1 := cmd.NewNode(config1) + node2 := cmd.NewNode(config2) + node1.Params.UserAgent = "Node1" + node1.Params.ProtocolVersion = lib.ProtocolVersion2 + node2.Params.UserAgent = "Node2" + node2.Params.ProtocolVersion = lib.ProtocolVersion1 + + node1 = startNode(t, node1) + node2 = startNode(t, node2) + defer node1.Stop() + defer node2.Stop() + + addrMgr := addrmgr.New("", net.LookupIP) + addr, err := lib.IPToNetAddr(node2.Listeners[0].Addr().String(), addrMgr, node1.Params) + require.NoError(err) + rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() + require.NoError(rnManagerN1.CreateNonValidatorOutboundConnection(addr)) + waitForNonValidatorOutboundConnection(t, node1, node2) + waitForValidatorConnection(t, node2, node1) +} + +func waitForValidatorConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { + userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() + n1ValidatedN2 := func() bool { + if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, true, false, false) { + return false + } + rnFromN2 := getRemoteNodeWithUserAgent(node1, userAgentN2) + if rnFromN2 == nil { + return false + } + if !rnFromN2.IsHandshakeCompleted() { + return false + } + return true + } + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) +} + +func waitForNonValidatorOutboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { + userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() + n1ValidatedN2 := func() bool { + if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, false, true, false) { + return false + } + rnFromN2 := getRemoteNodeWithUserAgent(node1, userAgentN2) + if rnFromN2 == nil { + return false + } + if !rnFromN2.IsHandshakeCompleted() { + return false + } + if rnFromN2.GetValidatorPublicKey() != nil { + return false + } + return true + } + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to outbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) +} + +func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) { + userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + rnManagerN1 := node1.Server.GetConnectionController().GetRemoteNodeManager() + n1ValidatedN2 := func() bool { + if true != checkRemoteNodeIndexerUserAgent(rnManagerN1, userAgentN2, false, false, true) { + return false + } + rnFromN2 := getRemoteNodeWithUserAgent(node1, userAgentN2) + if rnFromN2 == nil { + return false + } + if !rnFromN2.IsHandshakeCompleted() { + return false + } + if rnFromN2.GetValidatorPublicKey() != nil { + return false + } + return true + } + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), n1ValidatedN2) +} + +func checkRemoteNodeIndexerUserAgent(manager *lib.RemoteNodeManager, userAgent string, validator bool, + nonValidatorOutbound bool, nonValidatorInbound bool) bool { + + if true != checkUserAgentInRemoteNodeList(userAgent, manager.GetAllRemoteNodes().GetAll()) { + return false + } + if validator != checkUserAgentInRemoteNodeList(userAgent, manager.GetValidatorIndex().GetAll()) { + return false + } + if nonValidatorOutbound != checkUserAgentInRemoteNodeList(userAgent, manager.GetNonValidatorOutboundIndex().GetAll()) { + return false + } + if nonValidatorInbound != checkUserAgentInRemoteNodeList(userAgent, manager.GetNonValidatorInboundIndex().GetAll()) { + return false + } + + return true +} + +func checkUserAgentInRemoteNodeList(userAgent string, rnList []*lib.RemoteNode) bool { + for _, rn := range rnList { + if rn == nil { + continue + } + if rn.GetUserAgent() == userAgent { + return true + } + } + return false +} + +func getRemoteNodeWithUserAgent(node *cmd.Node, userAgent string) *lib.RemoteNode { + rnManager := node.Server.GetConnectionController().GetRemoteNodeManager() + rnList := rnManager.GetAllRemoteNodes().GetAll() + for _, rn := range rnList { + if rn.GetUserAgent() == userAgent { + return rn + } + } + return nil +} diff --git a/integration_testing/tools.go b/integration_testing/tools.go index c73b82873..033b608b3 100644 --- a/integration_testing/tools.go +++ b/integration_testing/tools.go @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32) config.MaxSyncBlockHeight = 0 config.ConnectIPs = []string{} config.PrivateMode = true - config.GlogV = 0 + config.GlogV = 2 config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0" config.MaxInboundPeers = maxPeers config.TargetOutboundPeers = maxPeers @@ -82,6 +82,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32) config.SnapshotBlockHeightPeriod = HyperSyncSnapshotPeriod config.MaxSyncBlockHeight = MaxSyncBlockHeight config.SyncType = lib.NodeSyncTypeBlockSync + config.PosValidatorSeed = "0x023f477056acd6807fd43958d2f8497c3b3522691173c2c4c65b8a3ecbc7db94" //config.ArchivalMode = true return config @@ -150,7 +151,8 @@ func compareNodesByChecksum(t *testing.T, nodeA *cmd.Node, nodeB *cmd.Node) { // compareNodesByState will look through all state records in nodeA and nodeB databases and will compare them. // The nodes pass this comparison iff they have identical states. func compareNodesByState(t *testing.T, nodeA *cmd.Node, nodeB *cmd.Node, verbose int) { - compareNodesByStateWithPrefixList(t, nodeA.ChainDB, nodeB.ChainDB, lib.StatePrefixes.StatePrefixesList, verbose) + compareNodesByStateWithPrefixList(t, nodeA.Server.GetBlockchain().DB(), nodeB.Server.GetBlockchain().DB(), + lib.StatePrefixes.StatePrefixesList, verbose) } // compareNodesByDB will look through all records in nodeA and nodeB databases and will compare them. @@ -164,7 +166,8 @@ func compareNodesByDB(t *testing.T, nodeA *cmd.Node, nodeB *cmd.Node, verbose in } prefixList = append(prefixList, []byte{prefix}) } - compareNodesByStateWithPrefixList(t, nodeA.ChainDB, nodeB.ChainDB, prefixList, verbose) + compareNodesByStateWithPrefixList(t, nodeA.Server.GetBlockchain().DB(), nodeB.Server.GetBlockchain().DB(), + prefixList, verbose) } // compareNodesByDB will look through all records in nodeA and nodeB txindex databases and will compare them. @@ -386,25 +389,25 @@ func restartNode(t *testing.T, node *cmd.Node) *cmd.Node { } // listenForBlockHeight busy-waits until the node's block tip reaches provided height. -func listenForBlockHeight(t *testing.T, node *cmd.Node, height uint32, signal chan<- bool) { +func listenForBlockHeight(node *cmd.Node, height uint32) (_listener chan bool) { + listener := make(chan bool) ticker := time.NewTicker(1 * time.Millisecond) go func() { for { <-ticker.C if node.Server.GetBlockchain().BlockTip().Height >= height { - signal <- true + listener <- true break } } }() + return listener } // disconnectAtBlockHeight busy-waits until the node's block tip reaches provided height, and then disconnects // from the provided bridge. -func disconnectAtBlockHeight(t *testing.T, syncingNode *cmd.Node, bridge *ConnectionBridge, height uint32) { - listener := make(chan bool) - listenForBlockHeight(t, syncingNode, height, listener) - <-listener +func disconnectAtBlockHeight(syncingNode *cmd.Node, bridge *ConnectionBridge, height uint32) { + <-listenForBlockHeight(syncingNode, height) bridge.Disconnect() } @@ -414,7 +417,7 @@ func restartAtHeightAndReconnectNode(t *testing.T, node *cmd.Node, source *cmd.N height uint32) (_node *cmd.Node, _bridge *ConnectionBridge) { require := require.New(t) - disconnectAtBlockHeight(t, node, currentBridge, height) + disconnectAtBlockHeight(node, currentBridge, height) newNode := restartNode(t, node) // Wait after the restart. time.Sleep(1 * time.Second) @@ -475,3 +478,23 @@ func randomUint32Between(t *testing.T, min, max uint32) uint32 { randomHeight := uint32(randomNumber) % (max - min) return randomHeight + min } + +func waitForCondition(t *testing.T, id string, condition func() bool) { + signalChan := make(chan struct{}) + go func() { + for { + if condition() { + signalChan <- struct{}{} + return + } + time.Sleep(1 * time.Millisecond) + } + }() + + select { + case <-signalChan: + return + case <-time.After(5 * time.Second): + t.Fatalf("Condition timed out | %s", id) + } +} diff --git a/integration_testing/txindex_test.go b/integration_testing/txindex_test.go index aa13fd265..dfd398557 100644 --- a/integration_testing/txindex_test.go +++ b/integration_testing/txindex_test.go @@ -57,6 +57,7 @@ func TestSimpleTxIndex(t *testing.T) { compareNodesByDB(t, node1, node2, 0) compareNodesByTxIndex(t, node1, node2, 0) fmt.Println("Databases match!") + bridge.Disconnect() node1.Stop() node2.Stop() } diff --git a/lib/connection_manager.go b/lib/connection_manager.go index 38924bdf9..6838d926d 100644 --- a/lib/connection_manager.go +++ b/lib/connection_manager.go @@ -4,7 +4,6 @@ import ( "fmt" "math" "net" - "strconv" "sync" "sync/atomic" "time" @@ -14,7 +13,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/decred/dcrd/lru" "github.com/golang/glog" - "github.com/pkg/errors" ) // connection_manager.go contains most of the logic for creating and managing @@ -36,24 +34,10 @@ type ConnectionManager struct { // doesn't need a reference to the Server object. But for now we keep things lazy. srv *Server - // When --connectips is set, we don't connect to anything from the addrmgr. - connectIps []string - - // The address manager keeps track of peer addresses we're aware of. When - // we need to connect to a new outbound peer, it chooses one of the addresses - // it's aware of at random and provides it to us. - AddrMgr *addrmgr.AddrManager // The interfaces we listen on for new incoming connections. listeners []net.Listener // The parameters we are initialized with. params *DeSoParams - // The target number of outbound peers we want to have. - targetOutboundPeers uint32 - // The maximum number of inbound peers we allow. - maxInboundPeers uint32 - // When true, only one connection per IP is allowed. Prevents eclipse attacks - // among other things. - limitOneInboundConnectionPerIP bool // When --hypersync is set to true we will attempt fast block synchronization HyperSync bool @@ -136,10 +120,8 @@ type ConnectionManager struct { } func NewConnectionManager( - _params *DeSoParams, _addrMgr *addrmgr.AddrManager, _listeners []net.Listener, + _params *DeSoParams, _listeners []net.Listener, _connectIps []string, _timeSource chainlib.MedianTimeSource, - _targetOutboundPeers uint32, _maxInboundPeers uint32, - _limitOneInboundConnectionPerIP bool, _hyperSync bool, _syncType NodeSyncType, _stallTimeoutSeconds uint64, @@ -150,16 +132,13 @@ func NewConnectionManager( ValidateHyperSyncFlags(_hyperSync, _syncType) return &ConnectionManager{ - srv: _srv, - params: _params, - AddrMgr: _addrMgr, - listeners: _listeners, - connectIps: _connectIps, + srv: _srv, + params: _params, + listeners: _listeners, // We keep track of the last N nonces we've sent in order to detect // self connections. sentNonces: lru.NewCache(1000), timeSource: _timeSource, - //newestBlock: _newestBlock, // Initialize the peer data structures. @@ -176,15 +155,13 @@ func NewConnectionManager( newPeerChan: make(chan *Peer, 100), donePeerChan: make(chan *Peer, 100), outboundConnectionChan: make(chan *outboundConnection, 100), + inboundConnectionChan: make(chan *inboundConnection, 100), - targetOutboundPeers: _targetOutboundPeers, - maxInboundPeers: _maxInboundPeers, - limitOneInboundConnectionPerIP: _limitOneInboundConnectionPerIP, - HyperSync: _hyperSync, - SyncType: _syncType, - serverMessageQueue: _serverMessageQueue, - stallTimeoutSeconds: _stallTimeoutSeconds, - minFeeRateNanosPerKB: _minFeeRateNanosPerKB, + HyperSync: _hyperSync, + SyncType: _syncType, + serverMessageQueue: _serverMessageQueue, + stallTimeoutSeconds: _stallTimeoutSeconds, + minFeeRateNanosPerKB: _minFeeRateNanosPerKB, } } @@ -224,40 +201,6 @@ func (cmgr *ConnectionManager) subFromGroupKey(na *wire.NetAddress) { cmgr.mtxOutboundConnIPGroups.Unlock() } -func (cmgr *ConnectionManager) getRandomAddr() *wire.NetAddress { - for tries := 0; tries < 100; tries++ { - addr := cmgr.AddrMgr.GetAddress() - if addr == nil { - glog.V(2).Infof("ConnectionManager.getRandomAddr: addr from GetAddressWithExclusions was nil") - break - } - - // Lock the address map since multiple threads will be trying to read - // and modify it at the same time. - cmgr.mtxAddrsMaps.RLock() - ok := cmgr.connectedOutboundAddrs[addrmgr.NetAddressKey(addr.NetAddress())] - cmgr.mtxAddrsMaps.RUnlock() - if ok { - glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing already connected address %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) - continue - } - - // We can only have one outbound address per /16. This is similar to - // Bitcoin and we do it to prevent Sybil attacks. - if cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { - glog.V(2).Infof("ConnectionManager.getRandomAddr: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) - continue - } - - glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning %v:%v at %d iterations", - addr.NetAddress().IP, addr.NetAddress().Port, tries) - return addr.NetAddress() - } - - glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") - return nil -} - func _delayRetry(retryCount uint64, persistentAddrForLogging *wire.NetAddress, unit time.Duration) (_retryDuration time.Duration) { // No delay if we haven't tried yet or if the number of retries isn't positive. if retryCount <= 0 { @@ -276,42 +219,6 @@ func _delayRetry(retryCount uint64, persistentAddrForLogging *wire.NetAddress, u return retryDelay } -func (cmgr *ConnectionManager) enoughOutboundPeers() bool { - val := atomic.LoadUint32(&cmgr.numOutboundPeers) - if val > cmgr.targetOutboundPeers { - glog.Errorf("enoughOutboundPeers: Connected to too many outbound "+ - "peers: (%d). Should be "+ - "no more than (%d).", val, cmgr.targetOutboundPeers) - return true - } - - if val == cmgr.targetOutboundPeers { - return true - } - return false -} - -func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) { - port := params.DefaultSocketPort - host, portstr, err := net.SplitHostPort(ipStr) - if err != nil { - // No port specified so leave port=default and set - // host to the ipStr. - host = ipStr - } else { - pp, err := strconv.ParseUint(portstr, 10, 16) - if err != nil { - return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) - } - port = uint16(pp) - } - netAddr, err := addrMgr.HostToNetAddress(host, port, 0) - if err != nil { - return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) - } - return netAddr, nil -} - func (cmgr *ConnectionManager) IsConnectedOutboundIpAddress(netAddr *wire.NetAddress) bool { cmgr.mtxAddrsMaps.RLock() defer cmgr.mtxAddrsMaps.RUnlock() diff --git a/lib/network.go b/lib/network.go index 80d412c4f..9e9795678 100644 --- a/lib/network.go +++ b/lib/network.go @@ -1543,8 +1543,7 @@ func (msg *MsgDeSoPong) FromBytes(data []byte) error { type ServiceFlag uint64 const ( - // SFFullNodeDeprecated is deprecated, and set on all nodes by default - // now. We basically split it into SFHyperSync and SFArchivalMode. + // SFFullNodeDeprecated is deprecated, and set on all nodes by default now. SFFullNodeDeprecated ServiceFlag = 1 << 0 // SFHyperSync is a flag used to indicate that the peer supports hyper sync. SFHyperSync ServiceFlag = 1 << 1 diff --git a/lib/peer.go b/lib/peer.go index 98d2c135e..0af9aa0b7 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -1192,11 +1192,12 @@ func (pp *Peer) Start() { // If the address manager needs more addresses, then send a GetAddr message // to the peer. This is best-effort. if pp.cmgr != nil { - if pp.cmgr.AddrMgr.NeedMoreAddresses() { + // TODO: Move this to ConnectionController. + /*if pp.cmgr.AddrMgr.NeedMoreAddresses() { go func() { pp.QueueMessage(&MsgDeSoGetAddr{}) }() - } + }*/ } // Send our verack message now that the IO processing machinery has started. diff --git a/lib/pos_connection_controller.go b/lib/pos_connection_controller.go new file mode 100644 index 000000000..1b6442546 --- /dev/null +++ b/lib/pos_connection_controller.go @@ -0,0 +1,753 @@ +package lib + +import ( + "fmt" + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" + "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/go-deadlock" + "github.com/golang/glog" + "github.com/pkg/errors" + "net" + "strconv" + "sync" + "time" +) + +type ConnectionController struct { + // The parameters we are initialized with. + params *DeSoParams + + cmgr *ConnectionManager + signer *BLSSigner + + handshake *HandshakeController + + rnManager *RemoteNodeManager + + validatorMapLock sync.Mutex + getActiveValidators func() map[bls.SerializedPublicKey]*ValidatorEntry + + // The address manager keeps track of peer addresses we're aware of. When + // we need to connect to a new outbound peer, it chooses one of the addresses + // it's aware of at random and provides it to us. + AddrMgr *addrmgr.AddrManager + + // addrsToBroadcast is a list of all the addresses we've received from valid addr + // messages that we intend to broadcast to our peers. It is organized as: + // -> . + // + // It is organized in this way so that we can limit the number of addresses we + // are distributing for a single peer to avoid a DOS attack. + addrsToBroadcastLock deadlock.RWMutex + addrsToBroadcast map[string][]*SingleAddr + + // When --connectips is set, we don't connect to anything from the addrmgr. + connectIps []string + + // The target number of outbound peers we want to have. + targetOutboundPeers uint32 + // The maximum number of inbound peers we allow. + maxInboundPeers uint32 + // When true, only one connection per IP is allowed. Prevents eclipse attacks + // among other things. + limitOneInboundConnectionPerIP bool + + startGroup sync.WaitGroup + exitChan chan struct{} + exitGroup sync.WaitGroup +} + +func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, rnManager *RemoteNodeManager, signer *BLSSigner, + addrMgr *addrmgr.AddrManager, targetOutboundPeers uint32, maxInboundPeers uint32, + limitOneInboundConnectionPerIP bool) *ConnectionController { + + return &ConnectionController{ + params: params, + cmgr: cmgr, + signer: signer, + rnManager: rnManager, + AddrMgr: addrMgr, + addrsToBroadcast: make(map[string][]*SingleAddr), + targetOutboundPeers: targetOutboundPeers, + maxInboundPeers: maxInboundPeers, + limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP, + exitChan: make(chan struct{}), + } +} + +func (cc *ConnectionController) Start() { + cc.startGroup.Add(3) + // Start the validator connector + go cc.startValidatorConnector() + + cc.startGroup.Wait() + cc.exitGroup.Add(3) +} + +func (cc *ConnectionController) Stop() { + close(cc.exitChan) + cc.exitGroup.Wait() +} + +func (cc *ConnectionController) GetRemoteNodeManager() *RemoteNodeManager { + return cc.rnManager +} + +func (cc *ConnectionController) initiatePersistentConnections() { + // This is a hack to make outbound connections go away. + if cc.targetOutboundPeers == 0 { + return + } + if len(cc.connectIps) > 0 { + // Connect to addresses passed via the --connect-ips flag. These addresses + // are persistent in the sense that if we disconnect from one, we will + // try to reconnect to the same one. + for _, connectIp := range cc.connectIps { + if err := cc.createPersistentOutboundConnection(connectIp); err != nil { + glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+ + "to connectIp %v: %v", connectIp, err) + } + } + } +} + +func (cc *ConnectionController) startValidatorConnector() { + cc.startGroup.Done() + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Minute): + cc.validatorMapLock.Lock() + activeValidatorsMap := cc.getActiveValidators() + cc.refreshValidatorIndex(activeValidatorsMap) + cc.connectValidators(activeValidatorsMap) + cc.validatorMapLock.Unlock() + } + } +} + +func (cc *ConnectionController) startPeerConnector() { + cc.startGroup.Done() + + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + // Only connect to addresses from the addrmgr if we don't specify --connect-ips. + // These addresses are *not* persistent, meaning if we disconnect from one we'll + // try a different one. + // TODO: Do we still want this? + if len(cc.connectIps) == 0 { + continue + } + + cc.refreshOutboundIndex() + cc.refreshInboundIndex() + cc.connectPeers() + } + } +} + +// Must be run inside a goroutine. Relays addresses to peers at regular intervals +// and relays our own address to peers once every 24 hours. +func (cc *ConnectionController) startAddressRelayer() { + cc.startGroup.Done() + numMinutesPassed := 0 + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(AddrRelayIntervalSeconds * time.Second): + // For the first ten minutes after the connection controller starts, relay our address to all + // peers. After the first ten minutes, do it once every 24 hours. + glog.V(1).Infof("ConnectionController.startAddressRelayer: Relaying our own addr to peers") + if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { + // TODO: Change to retrieve all RemoteNodes from the indexer. + for _, pp := range cc.cmgr.GetAllPeers() { + bestAddress := cc.AddrMgr.GetBestLocalAddress(pp.netAddr) + if bestAddress != nil { + glog.V(2).Infof("ConnectionController.startAddressRelayer: Relaying address %v to "+ + "peer %v", bestAddress.IP.String(), pp) + if err := cc.cmgr.SendMessage(&MsgDeSoAddr{ + AddrList: []*SingleAddr{ + { + Timestamp: time.Now(), + IP: bestAddress.IP, + Port: bestAddress.Port, + Services: (ServiceFlag)(bestAddress.Services), + }, + }, + }, pp.ID); err != nil { + glog.Errorf("ConnectionController.startAddressRelayer: Problem sending "+ + "MsgDeSoAddr to peer %v: %v", pp, err) + } + } + } + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Seeing if there are addrs to relay...") + // Broadcast the addrs we have to all of our peers. + addrsToBroadcast := cc.getAddrsToBroadcast() + if len(addrsToBroadcast) == 0 { + glog.V(2).Infof("ConnectionController.startAddressRelayer: No addrs to relay.") + time.Sleep(AddrRelayIntervalSeconds * time.Second) + continue + } + + glog.V(2).Infof("ConnectionController.startAddressRelayer: Found %d addrs to "+ + "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) + // Iterate over all our peers and broadcast the addrs to all of them. + for _, pp := range cc.cmgr.GetAllPeers() { + pp.AddDeSoMessage(&MsgDeSoAddr{ + AddrList: addrsToBroadcast, + }, false) + } + time.Sleep(AddrRelayIntervalSeconds * time.Second) + numMinutesPassed++ + } + } +} + +// ########################### +// ## Handlers (Peer, DeSoMessage) +// ########################### + +func (cc *ConnectionController) _handleDonePeerMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeDisconnectedPeer { + return + } + + cc.rnManager.DisconnectById(NewRemoteNodeId(origin.ID)) +} + +func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeAddr { + return + } + + id := NewRemoteNodeId(origin.ID) + var msg *MsgDeSoAddr + var ok bool + if msg, ok = desoMsg.(*MsgDeSoAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rnManager.DisconnectById(id) + return + } + + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + glog.V(1).Infof("ConnectionController._handleAddrMessage: Received Addr from peer %v with addrs %v", origin, spew.Sdump(msg.AddrList)) + + // If this addr message contains more than the maximum allowed number of addresses + // then disconnect this peer. + if len(msg.AddrList) > MaxAddrsPerAddrMsg { + glog.Errorf(fmt.Sprintf("ConnectionController._handleAddrMessage: Disconnecting "+ + "Peer %v for sending us an addr message with %d transactions, which exceeds "+ + "the max allowed %d", + origin, len(msg.AddrList), MaxAddrsPerAddrMsg)) + + cc.rnManager.DisconnectById(id) + return + } + + // Add all the addresses we received to the addrmgr. + netAddrsReceived := []*wire.NetAddress{} + for _, addr := range msg.AddrList { + addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) + if !addrmgr.IsRoutable(addrAsNetAddr) { + glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, origin) + continue + } + + netAddrsReceived = append( + netAddrsReceived, addrAsNetAddr) + } + cc.AddrMgr.AddAddresses(netAddrsReceived, origin.netAddr) + + // If the message had <= 10 addrs in it, then queue all the addresses for relaying + // on the next cycle. + if len(msg.AddrList) <= 10 { + glog.V(1).Infof("ConnectionController._handleAddrMessage: Queueing %d addrs for forwarding from "+ + "peer %v", len(msg.AddrList), origin) + sourceAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: origin.netAddr.IP, + Port: origin.netAddr.Port, + Services: origin.serviceFlags, + } + listToAddTo, hasSeenSource := cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] + if !hasSeenSource { + listToAddTo = []*SingleAddr{} + } + // If this peer has been sending us a lot of little crap, evict a lot of their + // stuff but don't disconnect. + if len(listToAddTo) > MaxAddrsPerAddrMsg { + listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] + } + listToAddTo = append(listToAddTo, msg.AddrList...) + cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo + } +} + +func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeGetAddr { + return + } + + id := NewRemoteNodeId(origin.ID) + if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok { + glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+ + "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) + cc.rnManager.DisconnectById(id) + return + } + + glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", origin) + // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr + // and send them back to the peer. + netAddrsFound := cc.AddrMgr.AddressCache() + if len(netAddrsFound) > MaxAddrsPerAddrMsg { + netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] + } + + // Convert the list to a SingleAddr list. + res := &MsgDeSoAddr{} + for _, netAddr := range netAddrsFound { + singleAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: netAddr.IP, + Port: netAddr.Port, + Services: (ServiceFlag)(netAddr.Services), + } + res.AddrList = append(res.AddrList, singleAddr) + } + rn := cc.rnManager.GetRemoteNodeById(id) + if err := cc.rnManager.SendMessage(rn, res); err != nil { + glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", origin, err) + cc.rnManager.DisconnectById(id) + return + } +} + +func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMsg DeSoMessage) { + if desoMsg.GetMsgType() != MsgTypeNewConnection { + return + } + + var msg *MsgDeSoNewConnection + var ok bool + if msg, ok = desoMsg.(*MsgDeSoNewConnection); !ok { + return + } + + var remoteNode *RemoteNode + var err error + switch msg.Connection.GetConnectionType() { + case ConnectionTypeInbound: + remoteNode, err = cc.processInboundConnection(msg.Connection) + if remoteNode == nil || err != nil { + glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling inbound connection: %v", err) + msg.Connection.Close() + return + } + case ConnectionTypeOutbound: + remoteNode, err = cc.processOutboundConnection(msg.Connection) + if remoteNode == nil || err != nil { + glog.Errorf("ConnectionController.handleNewConnectionMessage: Problem handling outbound connection: %v", err) + var oc *outboundConnection + if oc, ok = msg.Connection.(*outboundConnection); !ok { + return + } + cc.cleanupFailedOutboundConnection(oc) + msg.Connection.Close() + return + } + } + + // If we made it here, we have a valid remote node. We will now initiate the handshake. + cc.handshake.InitiateHandshake(remoteNode) +} + +func (cc *ConnectionController) cleanupFailedOutboundConnection(oc *outboundConnection) { + id := NewRemoteNodeId(oc.attemptId) + rn := cc.rnManager.GetRemoteNodeById(id) + if rn != nil { + cc.rnManager.Disconnect(rn) + } + cc.cmgr.RemoveAttemptedOutboundAddrs(oc.address) +} + +// ########################### +// ## Validator Connections +// ########################### + +func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) { + // De-index inactive validators. Doesn't matter if the RemoteNodes are connected or not, if we exceed the number of + // outbound/inbound peers, these RemoteNodes will be Disconnected later. + // FIXME: Should we care about never-connecting persistent dials? + validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy() + for pk, rn := range validatorRemoteNodeMap { + if _, ok := activeValidatorsMap[pk]; !ok { + cc.rnManager.SetNonValidator(rn) + cc.rnManager.UnsetValidator(rn) + } + } + + // Look for validators in our existing outbound / inbound connections. + allNonValidators := cc.rnManager.GetAllNonValidators() + for _, rn := range allNonValidators { + pk := rn.GetValidatorPublicKey() + if pk == nil { + continue + } + if _, ok := activeValidatorsMap[pk.Serialize()]; ok { + cc.rnManager.SetValidator(rn) + cc.rnManager.UnsetNonValidator(rn) + } + } +} + +func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) { + for pk, validator := range activeValidatorsMap { + _, exists := cc.rnManager.GetValidatorIndex().Get(pk) + if !exists { + // FIXME: for now we'll only use the first address in the ValidatorEntry + address := string(validator.Domains[0]) + if err := cc.createValidatorConnection(address, pk); err != nil { + // TODO: Do we want to log an error here? + continue + } + } + } +} + +// ########################### +// ## Peer Connections +// ########################### + +func (cc *ConnectionController) connectPeers() { + numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + + remainingOutboundPeers := uint32(0) + if numOutboundPeers < cc.targetOutboundPeers { + remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers + } + for ii := uint32(0); ii < remainingOutboundPeers; ii++ { + addr := cc.getRandomUnconnectedAddress() + cc.AddrMgr.Attempt(addr) + // FIXME: error handle + cc.rnManager.CreateNonValidatorOutboundConnection(addr) + } +} + +func (cc *ConnectionController) refreshOutboundIndex() { + numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + + excessiveOutboundPeers := uint32(0) + if numOutboundPeers > cc.targetOutboundPeers { + excessiveOutboundPeers = numOutboundPeers - cc.targetOutboundPeers + } + // Disconnect random outbound peers if we have too many peers. + allOutboundRemoteNodes := cc.rnManager.GetNonValidatorOutboundIndex().GetAll() + var attemptedOutboundRemoteNodes, connectedOutboundRemoteNodes []*RemoteNode + + for _, rn := range allOutboundRemoteNodes { + if rn.IsHandshakeCompleted() { + connectedOutboundRemoteNodes = append(connectedOutboundRemoteNodes, rn) + } else { + attemptedOutboundRemoteNodes = append(attemptedOutboundRemoteNodes, rn) + } + } + for _, rn := range attemptedOutboundRemoteNodes { + if excessiveOutboundPeers == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveOutboundPeers-- + } + for _, rn := range connectedOutboundRemoteNodes { + if excessiveOutboundPeers == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveOutboundPeers-- + } +} + +func (cc *ConnectionController) refreshInboundIndex() { + numConnectedInboundPeers := uint32(cc.rnManager.GetNonValidatorInboundIndex().Count()) + + excessiveInboundPeers := uint32(0) + if numConnectedInboundPeers > cc.maxInboundPeers { + excessiveInboundPeers = numConnectedInboundPeers - cc.maxInboundPeers + } + // Disconnect random inbound peers if we have too many peers. + inboundRemoteNodes := cc.rnManager.GetNonValidatorInboundIndex().GetAll() + for _, rn := range inboundRemoteNodes { + if excessiveInboundPeers == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveInboundPeers-- + } +} + +func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress { + for tries := 0; tries < 100; tries++ { + addr := cc.AddrMgr.GetAddress() + if addr == nil { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: addr from GetAddressWithExclusions was nil") + break + } + + if cc.cmgr.IsConnectedOutboundIpAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundancy %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + if cc.cmgr.IsAttemptedOutboundIpAddress(addr.NetAddress()) { + continue + } + + // We can only have one outbound address per /16. This is similar to + // Bitcoin and we do it to prevent Sybil attacks. + if cc.cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + return addr.NetAddress() + } + + //glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") + return nil +} + +func (cc *ConnectionController) SetTargetOutboundPeers(numPeers uint32) { + cc.targetOutboundPeers = numPeers +} + +func (cc *ConnectionController) enoughInboundPeers() bool { + return uint32(cc.rnManager.GetNonValidatorInboundIndex().Count()) >= cc.maxInboundPeers +} + +func (cc *ConnectionController) enoughOutboundPeers() bool { + return uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) >= cc.targetOutboundPeers +} + +func (cc *ConnectionController) processInboundConnection(conn Connection) (*RemoteNode, error) { + var ic *inboundConnection + var ok bool + if ic, ok = conn.(*inboundConnection); !ok { + return nil, fmt.Errorf("ConnectionController.handleInboundConnection: Connection is not an inboundConnection") + } + + // As a quick check, reject the peer if we have too many already. Note that + // this check isn't perfect but we have a later check at the end after doing + // a version negotiation that will properly reject the peer if this check + // messes up e.g. due to a concurrency issue. + // + // TODO: We should instead have eviction logic here to prevent + // someone from monopolizing a node's inbound connections. + if cc.enoughInboundPeers() { + return nil, fmt.Errorf("ConnectionController.handleInboundConnection: Rejecting INBOUND peer (%s) due to max "+ + "inbound peers (%d) hit", ic.connection.RemoteAddr().String(), cc.maxInboundPeers) + } + + // If we want to limit inbound connections to one per IP address, check to + // make sure this address isn't already connected. + if cc.limitOneInboundConnectionPerIP && + cc.isFromRedundantInboundIPAddress(ic.connection.RemoteAddr()) { + + return nil, fmt.Errorf("ConnectionController.handleInboundConnection: Rejecting INBOUND peer (%s) due to already having an "+ + "inbound connection from the same IP with limit_one_inbound_connection_per_ip set", + ic.connection.RemoteAddr().String()) + } + + na, err := cc.ConvertIPStringToNetAddress(ic.connection.RemoteAddr().String()) + if err != nil { + return nil, errors.Wrapf(err, "ConnectionController.handleInboundConnection: Problem calling ipToNetAddr "+ + "for addr: (%s)", ic.connection.RemoteAddr().String()) + } + + remoteNode, err := cc.rnManager.AttachInboundConnection(ic.connection, na) + if err != nil { + return nil, errors.Wrapf(err, "ConnectionController.handleInboundConnection: Problem calling rnManager.AttachInboundConnection "+ + "for addr: (%s)", ic.connection.RemoteAddr().String()) + } + + return remoteNode, nil +} + +func (cc *ConnectionController) processOutboundConnection(conn Connection) (*RemoteNode, error) { + var oc *outboundConnection + var ok bool + if oc, ok = conn.(*outboundConnection); !ok { + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Connection is not an outboundConnection") + } + + if oc.failed { + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Failed to connect to peer (%s)", oc.address.IP.String()) + } + + if !oc.isPersistent { + cc.AddrMgr.Connected(oc.address) + cc.AddrMgr.Good(oc.address) + } + + // if this is a non-persistent outbound peer and we already have enough + // outbound peers, then don't bother adding this one. + if !oc.isPersistent && cc.enoughOutboundPeers() { + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Connected to maximum number of outbound "+ + "peers (%d)", cc.targetOutboundPeers) + } + + // If this is a non-persistent outbound peer and the group key + // overlaps with another peer we're already connected to then + // abort mission. We only connect to one peer per IP group in + // order to prevent Sybil attacks. + if !oc.isPersistent && cc.cmgr.IsFromRedundantOutboundIPAddress(oc.address) { + + // TODO: Make this less verbose + return nil, fmt.Errorf("ConnectionController.handleOutboundConnection: Rejecting OUTBOUND NON-PERSISTENT connection with "+ + "redundant group key (%s).", addrmgr.GroupKey(oc.address)) + } + + na, err := cc.ConvertIPStringToNetAddress(oc.connection.RemoteAddr().String()) + if err != nil { + return nil, errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling ipToNetAddr "+ + "for addr: (%s)", oc.connection.RemoteAddr().String()) + } + + remoteNode, err := cc.rnManager.AttachOutboundConnection(oc.connection, na, oc.attemptId, oc.isPersistent) + if err != nil { + return nil, errors.Wrapf(err, "ConnectionController.handleOutboundConnection: Problem calling rnManager.AttachOutboundConnection "+ + "for addr: (%s)", oc.connection.RemoteAddr().String()) + } + return remoteNode, nil +} + +func (cc *ConnectionController) createValidatorConnection(ipStr string, pk bls.SerializedPublicKey) (_err error) { + netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) + if err != nil { + return err + } + publicKey, err := pk.Deserialize() + if err != nil { + return err + } + return cc.rnManager.CreateValidatorConnection(netAddr, publicKey) +} + +func (cc *ConnectionController) createPersistentOutboundConnection(ipStr string) (_err error) { + netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) + if err != nil { + return err + } + return cc.rnManager.CreateNonValidatorPersistentOutboundConnection(netAddr) +} + +func (cc *ConnectionController) ConvertIPStringToNetAddress(ipStr string) (*wire.NetAddress, error) { + netAddr, err := IPToNetAddr(ipStr, cc.AddrMgr, cc.params) + if err != nil { + return nil, errors.Wrapf(err, + "ConnectionController.ConvertIPStringToNetAddress: Problem parsing "+ + "ipString to wire.NetAddress") + } + if netAddr == nil { + return nil, fmt.Errorf("ConnectionController.ConvertIPStringToNetAddress: " + + "address was nil after parsing") + } + return netAddr, nil +} + +func IPToNetAddr(ipStr string, addrMgr *addrmgr.AddrManager, params *DeSoParams) (*wire.NetAddress, error) { + port := params.DefaultSocketPort + host, portstr, err := net.SplitHostPort(ipStr) + if err != nil { + // No port specified so leave port=default and set + // host to the ipStr. + host = ipStr + } else { + pp, err := strconv.ParseUint(portstr, 10, 16) + if err != nil { + return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) + } + port = uint16(pp) + } + netAddr, err := addrMgr.HostToNetAddress(host, port, 0) + if err != nil { + return nil, errors.Wrapf(err, "IPToNetAddr: Can not parse port from %s for ip", ipStr) + } + return netAddr, nil +} + +func (cc *ConnectionController) isFromRedundantInboundIPAddress(addr net.Addr) bool { + netAddr, err := IPToNetAddr(addr.String(), cc.AddrMgr, cc.params) + if err != nil { + // Return true in case we have an error. We do this because it + // will result in the peer connection not being accepted, which + // is desired in this case. + glog.Warningf(errors.Wrapf(err, + "ConnectionController._isFromRedundantInboundIPAddress: Problem parsing "+ + "net.Addr to wire.NetAddress so marking as redundant and not "+ + "making connection").Error()) + return true + } + if netAddr == nil { + glog.Warningf("ConnectionController._isFromRedundantInboundIPAddress: " + + "address was nil after parsing so marking as redundant and not " + + "making connection") + return true + } + + return cc.cmgr.IsFromRedundantInboundIPAddress(netAddr) +} + +func (cc *ConnectionController) getAddrsToBroadcast() []*SingleAddr { + cc.addrsToBroadcastLock.Lock() + defer cc.addrsToBroadcastLock.Unlock() + + // If there's nothing in the map, return. + if len(cc.addrsToBroadcast) == 0 { + return []*SingleAddr{} + } + + // If we get here then we have some addresses to broadcast. + addrsToBroadcast := []*SingleAddr{} + for uint32(len(addrsToBroadcast)) < cc.params.MaxAddressesToBroadcast && + len(cc.addrsToBroadcast) > 0 { + // Choose a key at random. This works because map iteration is random in golang. + bucket := "" + for kk := range cc.addrsToBroadcast { + bucket = kk + break + } + + // Remove the last element from the slice for the given bucket. + currentAddrList := cc.addrsToBroadcast[bucket] + if len(currentAddrList) > 0 { + lastIndex := len(currentAddrList) - 1 + currentAddr := currentAddrList[lastIndex] + currentAddrList = currentAddrList[:lastIndex] + if len(currentAddrList) == 0 { + delete(cc.addrsToBroadcast, bucket) + } else { + cc.addrsToBroadcast[bucket] = currentAddrList + } + + addrsToBroadcast = append(addrsToBroadcast, currentAddr) + } + } + + return addrsToBroadcast +} diff --git a/lib/server.go b/lib/server.go index d1c82e5b3..c7c2cc53c 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "fmt" - "github.com/btcsuite/btcd/wire" "github.com/deso-protocol/core/consensus" "net" "reflect" @@ -62,7 +61,9 @@ type Server struct { eventManager *EventManager TxIndex *TXIndex + handshakeController *HandshakeController // fastHotStuffEventLoop consensus.FastHotStuffEventLoop + connectionController *ConnectionController // posMempool *PosMemPool TODO: Add the mempool later // All messages received from peers get sent from the ConnectionManager to the @@ -120,15 +121,6 @@ type Server struct { // requested data but have not yet received a response. requestedTransactionsMap map[BlockHash]*GetDataRequestInfo - // addrsToBroadcast is a list of all the addresses we've received from valid addr - // messages that we intend to broadcast to our peers. It is organized as: - // -> . - // - // It is organized in this way so that we can limit the number of addresses we - // are distributing for a single peer to avoid a DOS attack. - addrsToBroadcastLock deadlock.RWMutex - addrsToBroadcastt map[string][]*SingleAddr - // When set to true, we disable the ConnectionManager DisableNetworking bool @@ -175,6 +167,10 @@ func (srv *Server) ResetRequestQueues() { srv.requestedTransactionsMap = make(map[BlockHash]*GetDataRequestInfo) } +func (srv *Server) GetConnectionController() *ConnectionController { + return srv.connectionController +} + // dataLock must be acquired for writing before calling this function. func (srv *Server) _removeRequest(hash *BlockHash) { // Just be lazy and remove the hash from everything indiscriminately to @@ -445,8 +441,7 @@ func NewServer( // Create a new connection manager but note that it won't be initialized until Start(). _incomingMessages := make(chan *ServerMessage, (_targetOutboundPeers+_maxInboundPeers)*3) _cmgr := NewConnectionManager( - _params, _desoAddrMgr, _listeners, _connectIps, timesource, - _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP, + _params, _listeners, _connectIps, timesource, _hyperSync, _syncType, _stallTimeoutSeconds, _minFeeRateNanosPerKB, _incomingMessages, srv) @@ -481,6 +476,23 @@ func NewServer( hex.EncodeToString(_chain.blockTip().Hash[:]), hex.EncodeToString(BigintToHash(_chain.blockTip().CumWork)[:])) + // TODO: services flag + nodeServices := SFFullNodeDeprecated + if _hyperSync { + nodeServices |= SFHyperSync + } + if archivalMode { + nodeServices |= SFArchivalNode + } + if _blsKeystore != nil { + nodeServices |= SFPosValidator + } + rnManager := NewRemoteNodeManager(srv, _chain, _cmgr, _blsKeystore, _params, _minFeeRateNanosPerKB, nodeServices) + + srv.connectionController = NewConnectionController(_params, _cmgr, rnManager, _blsKeystore.GetSigner(), _desoAddrMgr, + _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) + srv.handshakeController = NewHandshakeController(rnManager) + if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) } @@ -567,9 +579,6 @@ func NewServer( srv.StartStatsdReporter() } - // Initialize the addrs to broadcast map. - srv.addrsToBroadcastt = make(map[string][]*SingleAddr) - // This will initialize the request queues. srv.ResetRequestQueues() @@ -2147,92 +2156,16 @@ func (srv *Server) StartStatsdReporter() { }() } -func (srv *Server) _handleAddrMessage(pp *Peer, msg *MsgDeSoAddr) { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - glog.V(1).Infof("Server._handleAddrMessage: Received Addr from peer %v with addrs %v", pp, spew.Sdump(msg.AddrList)) - - // If this addr message contains more than the maximum allowed number of addresses - // then disconnect this peer. - if len(msg.AddrList) > MaxAddrsPerAddrMsg { - glog.Errorf(fmt.Sprintf("Server._handleAddrMessage: Disconnecting "+ - "Peer %v for sending us an addr message with %d transactions, which exceeds "+ - "the max allowed %d", - pp, len(msg.AddrList), MaxAddrsPerAddrMsg)) - pp.Disconnect() - return - } - - // Add all the addresses we received to the addrmgr. - netAddrsReceived := []*wire.NetAddress{} - for _, addr := range msg.AddrList { - addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) - if !addrmgr.IsRoutable(addrAsNetAddr) { - glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, pp) - continue - } - - netAddrsReceived = append( - netAddrsReceived, addrAsNetAddr) - } - srv.cmgr.AddrMgr.AddAddresses(netAddrsReceived, pp.netAddr) - - // If the message had <= 10 addrs in it, then queue all the addresses for relaying - // on the next cycle. - if len(msg.AddrList) <= 10 { - glog.V(1).Infof("Server._handleAddrMessage: Queueing %d addrs for forwarding from "+ - "peer %v", len(msg.AddrList), pp) - sourceAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: pp.netAddr.IP, - Port: pp.netAddr.Port, - Services: pp.serviceFlags, - } - listToAddTo, hasSeenSource := srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] - if !hasSeenSource { - listToAddTo = []*SingleAddr{} - } - // If this peer has been sending us a lot of little crap, evict a lot of their - // stuff but don't disconnect. - if len(listToAddTo) > MaxAddrsPerAddrMsg { - listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] - } - listToAddTo = append(listToAddTo, msg.AddrList...) - srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo - } -} - -func (srv *Server) _handleGetAddrMessage(pp *Peer, msg *MsgDeSoGetAddr) { - glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", pp) - // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr - // and send them back to the peer. - netAddrsFound := srv.cmgr.AddrMgr.AddressCache() - if len(netAddrsFound) > MaxAddrsPerAddrMsg { - netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] - } - - // Convert the list to a SingleAddr list. - res := &MsgDeSoAddr{} - for _, netAddr := range netAddrsFound { - singleAddr := &SingleAddr{ - Timestamp: time.Now(), - IP: netAddr.IP, - Port: netAddr.Port, - Services: (ServiceFlag)(netAddr.Services), - } - res.AddrList = append(res.AddrList, singleAddr) - } - pp.AddDeSoMessage(res, false) -} - func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_shouldQuit bool) { switch serverMessage.Msg.(type) { // Control messages used internally to signal to the server. case *MsgDeSoPeerHandshakeComplete: - break + srv.handshakeController._handleHandshakePeerMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoDisconnectedPeer: srv._handleDonePeer(serverMessage.Peer) + srv.connectionController._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoNewConnection: + srv.connectionController._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoQuit: return true } @@ -2244,6 +2177,10 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) { // Handle all non-control message types from our Peers. switch msg := serverMessage.Msg.(type) { // Messages sent among peers. + case *MsgDeSoAddr: + srv.connectionController._handleAddrMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoGetAddr: + srv.connectionController._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoGetHeaders: srv._handleGetHeaders(serverMessage.Peer, msg) case *MsgDeSoHeaderBundle: @@ -2266,6 +2203,10 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) { srv._handleMempool(serverMessage.Peer, msg) case *MsgDeSoInv: srv._handleInv(serverMessage.Peer, msg) + case *MsgDeSoVersion: + srv.handshakeController._handleVersionMessage(serverMessage.Peer, serverMessage.Msg) + case *MsgDeSoVerack: + srv.handshakeController._handleVerackMessage(serverMessage.Peer, serverMessage.Msg) } } @@ -2362,20 +2303,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Infof("Server._startConsensus: Handling message of type %v from Peer %v", serverMessage.Msg.GetMsgType(), serverMessage.Peer) - - // If the message is an addr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeAddr { - srv._handleAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoAddr)) - continue - } - // If the message is a GetAddr message we handle it independent of whether or - // not the BitcoinManager is synced. - if serverMessage.Msg.GetMsgType() == MsgTypeGetAddr { - srv._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoGetAddr)) - continue - } - srv._handlePeerMessages(serverMessage) // Always check for and handle control messages regardless of whether the @@ -2396,96 +2323,6 @@ func (srv *Server) _startConsensus() { glog.V(2).Info("Server.Start: Server done") } -func (srv *Server) _getAddrsToBroadcast() []*SingleAddr { - srv.addrsToBroadcastLock.Lock() - defer srv.addrsToBroadcastLock.Unlock() - - // If there's nothing in the map, return. - if len(srv.addrsToBroadcastt) == 0 { - return []*SingleAddr{} - } - - // If we get here then we have some addresses to broadcast. - addrsToBroadcast := []*SingleAddr{} - for len(addrsToBroadcast) < 10 && len(srv.addrsToBroadcastt) > 0 { - // Choose a key at random. This works because map iteration is random in golang. - bucket := "" - for kk := range srv.addrsToBroadcastt { - bucket = kk - break - } - - // Remove the last element from the slice for the given bucket. - currentAddrList := srv.addrsToBroadcastt[bucket] - if len(currentAddrList) > 0 { - lastIndex := len(currentAddrList) - 1 - currentAddr := currentAddrList[lastIndex] - currentAddrList = currentAddrList[:lastIndex] - if len(currentAddrList) == 0 { - delete(srv.addrsToBroadcastt, bucket) - } else { - srv.addrsToBroadcastt[bucket] = currentAddrList - } - - addrsToBroadcast = append(addrsToBroadcast, currentAddr) - } - } - - return addrsToBroadcast -} - -// Must be run inside a goroutine. Relays addresses to peers at regular intervals -// and relays our own address to peers once every 24 hours. -func (srv *Server) _startAddressRelayer() { - for numMinutesPassed := 0; ; numMinutesPassed++ { - if atomic.LoadInt32(&srv.shutdown) >= 1 { - break - } - // For the first ten minutes after the server starts, relay our address to all - // peers. After the first ten minutes, do it once every 24 hours. - glog.V(1).Infof("Server.Start._startAddressRelayer: Relaying our own addr to peers") - if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { - for _, pp := range srv.cmgr.GetAllPeers() { - bestAddress := srv.cmgr.AddrMgr.GetBestLocalAddress(pp.netAddr) - if bestAddress != nil { - glog.V(2).Infof("Server.Start._startAddressRelayer: Relaying address %v to "+ - "peer %v", bestAddress.IP.String(), pp) - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: []*SingleAddr{ - { - Timestamp: time.Now(), - IP: bestAddress.IP, - Port: bestAddress.Port, - Services: (ServiceFlag)(bestAddress.Services), - }, - }, - }, false) - } - } - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Seeing if there are addrs to relay...") - // Broadcast the addrs we have to all of our peers. - addrsToBroadcast := srv._getAddrsToBroadcast() - if len(addrsToBroadcast) == 0 { - glog.V(2).Infof("Server.Start._startAddressRelayer: No addrs to relay.") - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } - - glog.V(2).Infof("Server.Start._startAddressRelayer: Found %d addrs to "+ - "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) - // Iterate over all our peers and broadcast the addrs to all of them. - for _, pp := range srv.cmgr.GetAllPeers() { - pp.AddDeSoMessage(&MsgDeSoAddr{ - AddrList: addrsToBroadcast, - }, false) - } - time.Sleep(AddrRelayIntervalSeconds * time.Second) - continue - } -} - func (srv *Server) _startTransactionRelayer() { // If we've set a maximum sync height, we will not relay transactions. if srv.blockchain.MaxSyncBlockHeight > 0 { @@ -2578,8 +2415,6 @@ func (srv *Server) Start() { go srv._startConsensus() - go srv._startAddressRelayer() - go srv._startTransactionRelayer() // Once the ConnectionManager is started, peers will be found and connected to and