Skip to content

Commit

Permalink
Revert addrMgr changes; proper routine stops
Browse files Browse the repository at this point in the history
  • Loading branch information
AeonSw4n committed Jan 28, 2024
1 parent 922ae56 commit 84a64a1
Show file tree
Hide file tree
Showing 9 changed files with 515 additions and 501 deletions.
6 changes: 2 additions & 4 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,12 @@ func (bridge *ConnectionBridge) createInboundConnection(node *cmd.Node) *lib.Pee

// This channel is redundant in our setting.
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
// Because it is an inbound Peer of the node, it is simultaneously a "fake" outbound Peer of the bridge.
// Hence, we will mark the _isOutbound parameter as "true" in NewPeer.
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, true,
netAddress, true, 10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, donePeerChan)
return peer
}

Expand All @@ -144,11 +143,10 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo
addrMgr := addrmgr.New("", net.LookupIP)
na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), addrMgr, otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn,
false, na, false, 10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, donePeerChan)
bridge.newPeerChan <- peer
//}
}(ll)
Expand Down
68 changes: 35 additions & 33 deletions lib/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand All @@ -19,6 +18,10 @@ import (
)

type DeSoBlockProducer struct {
startGroup sync.WaitGroup
exitChan chan struct{}
exitGroup sync.WaitGroup

// The minimum amount of time we wait before trying to produce a new block
// template. If this value is set low enough then we will produce a block template
// continuously.
Expand Down Expand Up @@ -47,11 +50,6 @@ type DeSoBlockProducer struct {

// producerWaitGroup allows us to wait until the producer has properly closed.
producerWaitGroup sync.WaitGroup
// exit is used to signal that DeSoBlockProducer routines should be terminated.
exit int32
// isAsleep is a helper variable for quitting that indicates whether the DeSoBlockProducer is asleep. While producing
// blocks, we sleep for a few seconds. Instead of waiting for the sleep to finish, we use this variable to quit immediately.
isAsleep int32
}

type BlockTemplateStats struct {
Expand Down Expand Up @@ -102,6 +100,7 @@ func NewDeSoBlockProducer(
chain: chain,
params: params,
postgres: postgres,
exitChan: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -353,10 +352,8 @@ func (desoBlockProducer *DeSoBlockProducer) _getBlockTemplate(publicKey []byte)
}

func (desoBlockProducer *DeSoBlockProducer) Stop() {
atomic.AddInt32(&desoBlockProducer.exit, 1)
if atomic.LoadInt32(&desoBlockProducer.isAsleep) == 0 {
desoBlockProducer.producerWaitGroup.Wait()
}
close(desoBlockProducer.exitChan)
desoBlockProducer.exitGroup.Wait()
}

func (desoBlockProducer *DeSoBlockProducer) GetRecentBlock(blockHash *BlockHash) *MsgDeSoBlock {
Expand Down Expand Up @@ -591,35 +588,40 @@ func (desoBlockProducer *DeSoBlockProducer) Start() {
return
}

desoBlockProducer.startGroup.Add(1)
desoBlockProducer.exitGroup.Add(1)
go desoBlockProducer.start()
desoBlockProducer.startGroup.Wait()
}

func (desoBlockProducer *DeSoBlockProducer) start() {
// Set the time to a nil value so we run on the first iteration of the loop.
var lastBlockUpdate time.Time
desoBlockProducer.producerWaitGroup.Add(1)

sleepDuration := 0 * time.Second
for {
if atomic.LoadInt32(&desoBlockProducer.exit) >= 0 {
desoBlockProducer.producerWaitGroup.Done()
select {
case <-desoBlockProducer.exitChan:
desoBlockProducer.exitGroup.Done()
return
}

secondsLeft := float64(desoBlockProducer.minBlockUpdateIntervalSeconds) - time.Since(lastBlockUpdate).Seconds()
if !lastBlockUpdate.IsZero() && secondsLeft > 0 {
glog.V(1).Infof("Sleeping for %v seconds before producing next block template...", secondsLeft)
atomic.AddInt32(&desoBlockProducer.isAsleep, 1)
time.Sleep(time.Duration(math.Ceil(secondsLeft)) * time.Second)
atomic.AddInt32(&desoBlockProducer.isAsleep, -1)
continue
}
case <-time.After(sleepDuration):
secondsLeft := float64(desoBlockProducer.minBlockUpdateIntervalSeconds) - time.Since(lastBlockUpdate).Seconds()
if !lastBlockUpdate.IsZero() && secondsLeft > 0 {
glog.V(1).Infof("Sleeping for %v seconds before producing next block template...", secondsLeft)
sleepDuration = time.Duration(math.Ceil(secondsLeft)) * time.Second
continue
}

// Update the time so start the clock for the next iteration.
lastBlockUpdate = time.Now()
// Update the time so start the clock for the next iteration.
lastBlockUpdate = time.Now()

glog.V(1).Infof("Producing block template...")
err := desoBlockProducer.UpdateLatestBlockTemplate()
if err != nil {
// If we hit an error, log it and sleep for a second. This could happen due to us
// being in the middle of processing a block or something.
glog.Errorf("Error producing block template: %v", err)
time.Sleep(time.Second)
glog.V(1).Infof("Producing block template...")
err := desoBlockProducer.UpdateLatestBlockTemplate()
if err != nil {
// If we hit an error, log it and sleep for a second. This could happen due to us
// being in the middle of processing a block or something.
glog.Errorf("Error producing block template: %v", err)
sleepDuration = time.Second
}
}
}
}
214 changes: 1 addition & 213 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/core/collections"
"github.com/deso-protocol/core/consensus"
Expand Down Expand Up @@ -109,11 +108,11 @@ func (cc *ConnectionController) Start() {
go cc.startRemoteNodeCleanup()

cc.startGroup.Wait()
cc.exitGroup.Add(4)
}

func (cc *ConnectionController) Stop() {
if !DisableNetworkManagerRoutines {
cc.exitGroup.Add(4)
close(cc.exitChan)
cc.exitGroup.Wait()
}
Expand Down Expand Up @@ -177,67 +176,6 @@ func (cc *ConnectionController) startNonValidatorConnector() {
}
}

// Must be run inside a goroutine. Relays addresses to peers at regular intervals
// and relays our own address to peers once every 24 hours.
func (cc *ConnectionController) startAddressRelayer() {
cc.startGroup.Done()
numMinutesPassed := 0
for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(AddrRelayIntervalSeconds * time.Second):
// For the first ten minutes after the connection controller starts, relay our address to all
// peers. After the first ten minutes, do it once every 24 hours.
glog.V(1).Infof("ConnectionController.startAddressRelayer: Relaying our own addr to peers")
if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 {
// TODO: Change to retrieve all RemoteNodes from the indexer.
for _, pp := range cc.cmgr.GetAllPeers() {
bestAddress := cc.AddrMgr.GetBestLocalAddress(pp.netAddr)
if bestAddress != nil {
glog.V(2).Infof("ConnectionController.startAddressRelayer: Relaying address %v to "+
"peer %v", bestAddress.IP.String(), pp)
if err := cc.cmgr.SendMessage(&MsgDeSoAddr{
AddrList: []*SingleAddr{
{
Timestamp: time.Now(),
IP: bestAddress.IP,
Port: bestAddress.Port,
Services: (ServiceFlag)(bestAddress.Services),
},
},
}, pp.ID); err != nil {
glog.Errorf("ConnectionController.startAddressRelayer: Problem sending "+
"MsgDeSoAddr to peer %v: %v", pp, err)
}
}
}
}

glog.V(2).Infof("ConnectionController.startAddressRelayer: Seeing if there are addrs to relay...")
// Broadcast the addrs we have to all of our peers.
addrsToBroadcast := cc.getAddrsToBroadcast()
if len(addrsToBroadcast) == 0 {
glog.V(2).Infof("ConnectionController.startAddressRelayer: No addrs to relay.")
time.Sleep(AddrRelayIntervalSeconds * time.Second)
continue
}

glog.V(2).Infof("ConnectionController.startAddressRelayer: Found %d addrs to "+
"relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast))
// Iterate over all our peers and broadcast the addrs to all of them.
for _, pp := range cc.cmgr.GetAllPeers() {
pp.AddDeSoMessage(&MsgDeSoAddr{
AddrList: addrsToBroadcast,
}, false)
}
time.Sleep(AddrRelayIntervalSeconds * time.Second)
numMinutesPassed++
}
}
}

func (cc *ConnectionController) startRemoteNodeCleanup() {
cc.startGroup.Done()

Expand Down Expand Up @@ -274,117 +212,6 @@ func (cc *ConnectionController) _handleDonePeerMessage(origin *Peer, desoMsg DeS
}
}

func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) {
if desoMsg.GetMsgType() != MsgTypeAddr {
return
}

id := NewRemoteNodeId(origin.ID)
var msg *MsgDeSoAddr
var ok bool
if msg, ok = desoMsg.(*MsgDeSoAddr); !ok {
glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+
"MsgDeSoAddr: %v", spew.Sdump(desoMsg))
cc.rnManager.DisconnectById(id)
return
}

cc.addrsToBroadcastLock.Lock()
defer cc.addrsToBroadcastLock.Unlock()

glog.V(1).Infof("ConnectionController._handleAddrMessage: Received Addr from peer %v with addrs %v", origin, spew.Sdump(msg.AddrList))

// If this addr message contains more than the maximum allowed number of addresses
// then disconnect this peer.
if len(msg.AddrList) > MaxAddrsPerAddrMsg {
glog.Errorf(fmt.Sprintf("ConnectionController._handleAddrMessage: Disconnecting "+
"Peer %v for sending us an addr message with %d transactions, which exceeds "+
"the max allowed %d",
origin, len(msg.AddrList), MaxAddrsPerAddrMsg))

cc.rnManager.DisconnectById(id)
return
}

// Add all the addresses we received to the addrmgr.
netAddrsReceived := []*wire.NetAddress{}
for _, addr := range msg.AddrList {
addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services))
if !addrmgr.IsRoutable(addrAsNetAddr) {
glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, origin)
continue
}

netAddrsReceived = append(
netAddrsReceived, addrAsNetAddr)
}
cc.AddrMgr.AddAddresses(netAddrsReceived, origin.netAddr)

// If the message had <= 10 addrs in it, then queue all the addresses for relaying
// on the next cycle.
if len(msg.AddrList) <= 10 {
glog.V(1).Infof("ConnectionController._handleAddrMessage: Queueing %d addrs for forwarding from "+
"peer %v", len(msg.AddrList), origin)
sourceAddr := &SingleAddr{
Timestamp: time.Now(),
IP: origin.netAddr.IP,
Port: origin.netAddr.Port,
Services: origin.serviceFlags,
}
listToAddTo, hasSeenSource := cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)]
if !hasSeenSource {
listToAddTo = []*SingleAddr{}
}
// If this peer has been sending us a lot of little crap, evict a lot of their
// stuff but don't disconnect.
if len(listToAddTo) > MaxAddrsPerAddrMsg {
listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2]
}
listToAddTo = append(listToAddTo, msg.AddrList...)
cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo
}
}

func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) {
if desoMsg.GetMsgType() != MsgTypeGetAddr {
return
}

id := NewRemoteNodeId(origin.ID)
if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok {
glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+
"MsgDeSoAddr: %v", spew.Sdump(desoMsg))
cc.rnManager.DisconnectById(id)
return
}

glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", origin)
// When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr
// and send them back to the peer.
netAddrsFound := cc.AddrMgr.AddressCache()
if len(netAddrsFound) > MaxAddrsPerAddrMsg {
netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg]
}

// Convert the list to a SingleAddr list.
res := &MsgDeSoAddr{}
for _, netAddr := range netAddrsFound {
singleAddr := &SingleAddr{
Timestamp: time.Now(),
IP: netAddr.IP,
Port: netAddr.Port,
Services: (ServiceFlag)(netAddr.Services),
}
res.AddrList = append(res.AddrList, singleAddr)
}
rn := cc.rnManager.GetRemoteNodeById(id)
if err := cc.rnManager.SendMessage(rn, res); err != nil {
glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", origin, err)
cc.rnManager.DisconnectById(id)
return
}
}

// _handleNewConnectionMessage is called when a new outbound or inbound connection is established. It is responsible
// for creating a RemoteNode from the connection and initiating the handshake. The incoming DeSoMessage is a control message.
func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMsg DeSoMessage) {
Expand Down Expand Up @@ -901,42 +728,3 @@ func (cc *ConnectionController) isDuplicateInboundIPAddress(addr net.Addr) bool

return cc.cmgr.IsDuplicateInboundIPAddress(netAddr)
}

func (cc *ConnectionController) getAddrsToBroadcast() []*SingleAddr {
cc.addrsToBroadcastLock.Lock()
defer cc.addrsToBroadcastLock.Unlock()

// If there's nothing in the map, return.
if len(cc.addrsToBroadcast) == 0 {
return []*SingleAddr{}
}

// If we get here then we have some addresses to broadcast.
addrsToBroadcast := []*SingleAddr{}
for uint32(len(addrsToBroadcast)) < cc.params.MaxAddressesToBroadcast &&
len(cc.addrsToBroadcast) > 0 {
// Choose a key at random. This works because map iteration is random in golang.
bucket := ""
for kk := range cc.addrsToBroadcast {
bucket = kk
break
}

// Remove the last element from the slice for the given bucket.
currentAddrList := cc.addrsToBroadcast[bucket]
if len(currentAddrList) > 0 {
lastIndex := len(currentAddrList) - 1
currentAddr := currentAddrList[lastIndex]
currentAddrList = currentAddrList[:lastIndex]
if len(currentAddrList) == 0 {
delete(cc.addrsToBroadcast, bucket)
} else {
cc.addrsToBroadcast[bucket] = currentAddrList
}

addrsToBroadcast = append(addrsToBroadcast, currentAddr)
}
}

return addrsToBroadcast
}
Loading

0 comments on commit 84a64a1

Please sign in to comment.