Skip to content

Commit

Permalink
Revert "Revert routine stops"
Browse files Browse the repository at this point in the history
This reverts commit 3ec3f80.
  • Loading branch information
AeonSw4n committed Jan 29, 2024
1 parent b419748 commit 32fe07d
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 321 deletions.
68 changes: 35 additions & 33 deletions lib/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand All @@ -19,6 +18,10 @@ import (
)

type DeSoBlockProducer struct {
startGroup sync.WaitGroup
exitChan chan struct{}
exitGroup sync.WaitGroup

// The minimum amount of time we wait before trying to produce a new block
// template. If this value is set low enough then we will produce a block template
// continuously.
Expand Down Expand Up @@ -47,11 +50,6 @@ type DeSoBlockProducer struct {

// producerWaitGroup allows us to wait until the producer has properly closed.
producerWaitGroup sync.WaitGroup
// exit is used to signal that DeSoBlockProducer routines should be terminated.
exit int32
// isAsleep is a helper variable for quitting that indicates whether the DeSoBlockProducer is asleep. While producing
// blocks, we sleep for a few seconds. Instead of waiting for the sleep to finish, we use this variable to quit immediately.
isAsleep int32
}

type BlockTemplateStats struct {
Expand Down Expand Up @@ -102,6 +100,7 @@ func NewDeSoBlockProducer(
chain: chain,
params: params,
postgres: postgres,
exitChan: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -353,10 +352,8 @@ func (desoBlockProducer *DeSoBlockProducer) _getBlockTemplate(publicKey []byte)
}

func (desoBlockProducer *DeSoBlockProducer) Stop() {
atomic.AddInt32(&desoBlockProducer.exit, 1)
if atomic.LoadInt32(&desoBlockProducer.isAsleep) == 0 {
desoBlockProducer.producerWaitGroup.Wait()
}
close(desoBlockProducer.exitChan)
desoBlockProducer.exitGroup.Wait()
}

func (desoBlockProducer *DeSoBlockProducer) GetRecentBlock(blockHash *BlockHash) *MsgDeSoBlock {
Expand Down Expand Up @@ -591,35 +588,40 @@ func (desoBlockProducer *DeSoBlockProducer) Start() {
return
}

desoBlockProducer.startGroup.Add(1)
desoBlockProducer.exitGroup.Add(1)
go desoBlockProducer.start()
desoBlockProducer.startGroup.Wait()
}

func (desoBlockProducer *DeSoBlockProducer) start() {
// Set the time to a nil value so we run on the first iteration of the loop.
var lastBlockUpdate time.Time
desoBlockProducer.producerWaitGroup.Add(1)

sleepDuration := 0 * time.Second
for {
if atomic.LoadInt32(&desoBlockProducer.exit) >= 0 {
desoBlockProducer.producerWaitGroup.Done()
select {
case <-desoBlockProducer.exitChan:
desoBlockProducer.exitGroup.Done()
return
}

secondsLeft := float64(desoBlockProducer.minBlockUpdateIntervalSeconds) - time.Since(lastBlockUpdate).Seconds()
if !lastBlockUpdate.IsZero() && secondsLeft > 0 {
glog.V(1).Infof("Sleeping for %v seconds before producing next block template...", secondsLeft)
atomic.AddInt32(&desoBlockProducer.isAsleep, 1)
time.Sleep(time.Duration(math.Ceil(secondsLeft)) * time.Second)
atomic.AddInt32(&desoBlockProducer.isAsleep, -1)
continue
}
case <-time.After(sleepDuration):
secondsLeft := float64(desoBlockProducer.minBlockUpdateIntervalSeconds) - time.Since(lastBlockUpdate).Seconds()
if !lastBlockUpdate.IsZero() && secondsLeft > 0 {
glog.V(1).Infof("Sleeping for %v seconds before producing next block template...", secondsLeft)
sleepDuration = time.Duration(math.Ceil(secondsLeft)) * time.Second
continue
}

// Update the time so start the clock for the next iteration.
lastBlockUpdate = time.Now()
// Update the time so start the clock for the next iteration.
lastBlockUpdate = time.Now()

glog.V(1).Infof("Producing block template...")
err := desoBlockProducer.UpdateLatestBlockTemplate()
if err != nil {
// If we hit an error, log it and sleep for a second. This could happen due to us
// being in the middle of processing a block or something.
glog.Errorf("Error producing block template: %v", err)
time.Sleep(time.Second)
glog.V(1).Infof("Producing block template...")
err := desoBlockProducer.UpdateLatestBlockTemplate()
if err != nil {
// If we hit an error, log it and sleep for a second. This could happen due to us
// being in the middle of processing a block or something.
glog.Errorf("Error producing block template: %v", err)
sleepDuration = time.Second
}
}
}
}
169 changes: 79 additions & 90 deletions lib/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ type ConnectionManager struct {

minFeeRateNanosPerKB uint64

// More chans we might want. modifyRebroadcastInv chan interface{}
shutdown int32
startGroup sync.WaitGroup
listenerGroup sync.WaitGroup
exitChan chan struct{}
exitGroup sync.WaitGroup
}

func NewConnectionManager(
Expand Down Expand Up @@ -160,6 +162,7 @@ func NewConnectionManager(
serverMessageQueue: _serverMessageQueue,
stallTimeoutSeconds: _stallTimeoutSeconds,
minFeeRateNanosPerKB: _minFeeRateNanosPerKB,
exitChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -339,30 +342,23 @@ func (cmgr *ConnectionManager) IsDuplicateInboundIPAddress(netAddr *wire.NetAddr
return false
}

func (cmgr *ConnectionManager) _handleInboundConnections() {
for _, outerListener := range cmgr.listeners {
go func(ll net.Listener) {
for {
conn, err := ll.Accept()
if conn == nil {
return
}
glog.V(2).Infof("_handleInboundConnections: received connection from: local %v, remote %v",
conn.LocalAddr().String(), conn.RemoteAddr().String())
if atomic.LoadInt32(&cmgr.shutdown) != 0 {
glog.Info("_handleInboundConnections: Ignoring connection due to shutdown")
return
}
if err != nil {
glog.Errorf("_handleInboundConnections: Can't accept connection: %v", err)
continue
}

cmgr.inboundConnectionChan <- &inboundConnection{
connection: conn,
}
}
}(outerListener)
func (cmgr *ConnectionManager) _handleListener(ll net.Listener) {
cmgr.listenerGroup.Done()
for {
conn, err := ll.Accept()
if conn == nil {
return
}
glog.V(2).Infof("_handleListener: received connection from: local %v, remote %v",
conn.LocalAddr().String(), conn.RemoteAddr().String())
if err != nil {
glog.Errorf("_handleListener: Can't accept connection: %v", err)
continue
}

cmgr.inboundConnectionChan <- &inboundConnection{
connection: conn,
}
}
}

Expand Down Expand Up @@ -544,76 +540,60 @@ func (cmgr *ConnectionManager) GetNumOutboundPeers() uint32 {
}

func (cmgr *ConnectionManager) Stop() {
cmgr.mtxPeerMaps.Lock()
defer cmgr.mtxPeerMaps.Unlock()
cmgr.exitGroup.Add(2)
close(cmgr.exitChan)
cmgr.exitGroup.Wait()

if atomic.AddInt32(&cmgr.shutdown, 1) != 1 {
glog.Warningf("ConnectionManager.Stop is already in the process of " +
"shutting down")
return
// Close all of the listeners.
for _, listener := range cmgr.listeners {
_ = listener.Close()
}

for id := range cmgr.outboundConnectionAttempts {
cmgr.CloseAttemptedConnection(id)
}
glog.Infof("ConnectionManager: Stopping, number of inbound peers (%v), number of outbound "+
glog.Infof("ConnectionManager.Stop: Stopping, number of inbound peers (%v), number of outbound "+
"peers (%v), number of persistent peers (%v).", len(cmgr.inboundPeers), len(cmgr.outboundPeers),
len(cmgr.persistentPeers))
for _, peer := range cmgr.inboundPeers {
glog.V(1).Infof(CLog(Red, fmt.Sprintf("ConnectionManager.Stop: Inbound peer (%v)", peer)))
peer.Disconnect()
}
for _, peer := range cmgr.outboundPeers {
glog.V(1).Infof("ConnectionManager.Stop: Outbound peer (%v)", peer)
peer.Disconnect()
}
for _, peer := range cmgr.persistentPeers {
glog.V(1).Infof("ConnectionManager.Stop: Persistent peer (%v)", peer)
for _, peer := range cmgr.connectedPeers {
glog.V(1).Infof(CLog(Red, fmt.Sprintf("ConnectionManager.Stop: Stopping peer (id= %v)", peer.GetId())))
peer.Disconnect()
}

// Close all of the listeners.
for _, listener := range cmgr.listeners {
_ = listener.Close()
}
}

func (cmgr *ConnectionManager) Start() {
// Below is a basic description of the ConnectionManager's main loop:
//
// We have listeners (for inbound connections) and we have an addrmgr (for outbound connections).
// Specify TargetOutbound connections we want to have.
// Create TargetOutbound connection objects each with their own id.
// Add these connection objects to a map of some sort.
// Initiate TargetOutbound connections to peers using the addrmgr.
// When a connection fails, remove that connection from the map and try another connection in its place. Wait for that connection to return. Repeat.
// - If a connection has failed a few times then add a retryduration (since we're probably out of addresses).
// - If you can't connect to a node because the addrmgr returned nil, wait some amount of time and then try again.
// When a connection succeeds:
// - Send the peer a version message.
// - Read a version message from the peer.
// - Wait for the above two steps to return.
// - If the above steps don't return, then disconnect from the peer as above. Try to reconnect to another peer.
// If the steps above succeed
// - Have the peer enter a switch statement listening for all kinds of messages.
// - Send addr and getaddr messages as appropriate.

// Accept inbound connections from peers on our listeners.
cmgr._handleInboundConnections()

glog.Infof("Full node socket initialized")
cmgr.startGroup.Add(2)
cmgr.startListenersHandler()
go cmgr.startConnectionsHandler()
go cmgr.startPeerDisconnectHandler()
cmgr.startGroup.Wait()
}

func (cmgr *ConnectionManager) startListenersHandler() {
for _, ll := range cmgr.listeners {
cmgr.listenerGroup.Add(1)
go cmgr._handleListener(ll)
}
cmgr.listenerGroup.Wait()
}

func (cmgr *ConnectionManager) startConnectionsHandler() {
cmgr.startGroup.Done()
for {
// Log some data for each event.
cmgr._logOutboundPeerData()

select {
case <-cmgr.exitChan:
cmgr.exitGroup.Done()
return
case oc := <-cmgr.outboundConnectionChan:
if oc.failed {
glog.V(2).Infof("ConnectionManager.Start: Failed to establish an outbound connection with "+
"(id= %v)", oc.attemptId)
glog.V(2).Infof("ConnectionManager.startConnectionsHandler: Failed to establish an outbound "+
"connection with (id= %v)", oc.attemptId)
} else {
glog.V(2).Infof("ConnectionManager.Start: Successfully established an outbound connection with "+
"(addr= %v) (id= %v)", oc.connection.RemoteAddr(), oc.attemptId)
glog.V(2).Infof("ConnectionManager.startConnectionsHandler: Successfully established an outbound "+
"connection with (addr= %v) (id= %v)", oc.connection.RemoteAddr(), oc.attemptId)
}
cmgr.mtxConnectionAttempts.Lock()
delete(cmgr.outboundConnectionAttempts, oc.attemptId)
Expand All @@ -633,26 +613,35 @@ func (cmgr *ConnectionManager) Start() {
Connection: ic,
},
}
}
}
}

func (cmgr *ConnectionManager) startPeerDisconnectHandler() {
cmgr.startGroup.Done()
for {
select {
case <-cmgr.exitChan:
cmgr.exitGroup.Done()
return
case pp := <-cmgr.peerDisconnectChan:
{
// By the time we get here, it can be assumed that the Peer's Disconnect function
// has already been called, since that is what's responsible for adding the peer
// to this queue in the first place.
// By the time we get here, it can be assumed that the Peer's Disconnect function
// has already been called, since that is what's responsible for adding the peer
// to this queue in the first place.

glog.V(1).Infof("Done with peer (id=%v).", pp.ID)
glog.V(1).Infof("Done with peer (id=%v).", pp.ID)

// Remove the peer from our data structures.
cmgr.removePeer(pp)
// Remove the peer from our data structures.
cmgr.removePeer(pp)

// Potentially replace the peer. For example, if the Peer was an outbound Peer
// then we want to find a new peer in order to maintain our TargetOutboundPeers.
// Potentially replace the peer. For example, if the Peer was an outbound Peer
// then we want to find a new peer in order to maintain our TargetOutboundPeers.

// Signal the server about the Peer being done in case it wants to do something
// with it.
cmgr.serverMessageQueue <- &ServerMessage{
Peer: pp,
Msg: &MsgDeSoDisconnectedPeer{},
}
// Signal the server about the Peer being done in case it wants to do something
// with it.
cmgr.serverMessageQueue <- &ServerMessage{
Peer: pp,
Msg: &MsgDeSoDisconnectedPeer{},
}
}
}
Expand Down
Loading

0 comments on commit 32fe07d

Please sign in to comment.