Skip to content

Commit

Permalink
Add ConnectionController
Browse files Browse the repository at this point in the history
  • Loading branch information
AeonSw4n authored and tholonious committed Dec 15, 2023
1 parent 31974f6 commit de6f7c6
Show file tree
Hide file tree
Showing 8 changed files with 818 additions and 459 deletions.
10 changes: 6 additions & 4 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) {

// This just gets localhost listening addresses on the protocol port.
// Such as [{127.0.0.1 18000 } {::1 18000 }], and associated listener structs.
listeningAddrs, listeners := GetAddrsToListenOn(node.Config.ProtocolPort)
_ = listeningAddrs
_, node.Listeners = GetAddrsToListenOn(node.Config.ProtocolPort)

// If --connect-ips is not passed, we will connect the addresses from
// --add-ips, DNSSeeds, and DNSSeedGenerators.
Expand Down Expand Up @@ -203,13 +202,15 @@ func (node *Node) Start(exitChannels ...*chan struct{}) {
// Setup eventManager
eventManager := lib.NewEventManager()

blsKeystore, err := lib.NewBLSKeystore(node.Config.PosValidatorSeed)

// Setup the server. ShouldRestart is used whenever we detect an issue and should restart the node after a recovery
// process, just in case. These issues usually arise when the node was shutdown unexpectedly mid-operation. The node
// performs regular health checks to detect whenever this occurs.
shouldRestart := false
node.Server, err, shouldRestart = lib.NewServer(
node.Params,
listeners,
node.Listeners,
desoAddrMgr,
node.Config.ConnectIPs,
node.ChainDB,
Expand Down Expand Up @@ -244,7 +245,8 @@ func (node *Node) Start(exitChannels ...*chan struct{}) {
node.nodeMessageChan,
node.Config.ForceChecksum,
node.Config.StateChangeDir,
node.Config.HypersyncMaxQueueSize)
node.Config.HypersyncMaxQueueSize,
blsKeystore)
if err != nil {
// shouldRestart can be true if, on the previous run, we did not finish flushing all ancestral
// records to the DB. In this case, the snapshot is corrupted and needs to be computed. See the
Expand Down
4 changes: 4 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func SetupRunFlags(cmd *cobra.Command) {
cmd.PersistentFlags().String("postgres-uri", "", "BETA: Use Postgres as the backing store for chain data."+
"When enabled, most data is stored in postgres although badger is still currently used for some state. Run your "+
"Postgres instance on the same machine as your node for optimal performance.")
cmd.PersistentFlags().String("pos-validator-seed", "", "The private key of the Proof of Stake validator. "+
"The private key should be passed as hex, optionally prefixed with a '0x', and map to a valid BLS12_381 private key. "+
"The private key must be 32 bytes, or 64 characters, in length (excluding the '0x' prefix). Setting this flag automatically "+
"makes the node run Proof of Stake Validator.")
cmd.PersistentFlags().Uint32("max-sync-block-height", 0,
"Max sync block height")
// Hyper Sync
Expand Down
41 changes: 30 additions & 11 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo

na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), otherNode.Server.GetConnectionManager().AddrMgr,
otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage)
peer := lib.NewPeer(conn, false, na, false,
10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), uint64(lib.RandInt64(math.MaxInt64)),
conn, false, na, false, 10000, 0,
bridge.nodeB.Params, messagesFromPeer, nil, nil, lib.NodeSyncTypeAny,
newPeerChan, donePeerChan)
bridge.newPeerChan <- peer
//}
}(ll)
Expand All @@ -160,7 +162,7 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo
// getVersionMessage simulates a version message that the provided node would have sent.
func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVersion {
ver := lib.NewMessage(lib.MsgTypeVersion).(*lib.MsgDeSoVersion)
ver.Version = node.Params.ProtocolVersion
ver.Version = node.Params.ProtocolVersion.ToUint64()
ver.TstampSecs = time.Now().Unix()
ver.Nonce = uint64(lib.RandInt64(math.MaxInt64))
ver.UserAgent = node.Params.UserAgent
Expand All @@ -179,6 +181,23 @@ func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVe
return ver
}

func ReadWithTimeout(readFunc func() error, readTimeout time.Duration) error {
errChan := make(chan error)
go func() {
errChan <- readFunc()
}()
select {
case err := <-errChan:
{
return err
}
case <-time.After(readTimeout):
{
return fmt.Errorf("ReadWithTimeout: Timed out reading message")
}
}
}

// startConnection starts the connection by performing version and verack exchange with
// the provided connection, pretending to be the otherNode.
func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode *cmd.Node) error {
Expand All @@ -193,7 +212,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
}

// Wait for a response to the version message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -216,15 +235,15 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode

// Now prepare the verack message.
verackMsg := lib.NewMessage(lib.MsgTypeVerack)
verackMsg.(*lib.MsgDeSoVerack).Nonce = connection.VersionNonceReceived
verackMsg.(*lib.MsgDeSoVerack).NonceReceived = connection.VersionNonceReceived

// And send it to the connection.
if err := connection.WriteDeSoMessage(verackMsg); err != nil {
return err
}

// And finally wait for connection's response to the verack message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -235,9 +254,9 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
return fmt.Errorf("message is not verack! Type: %v", msg.GetMsgType())
}
verackMsg := msg.(*lib.MsgDeSoVerack)
if verackMsg.Nonce != connection.VersionNonceSent {
if verackMsg.NonceReceived != connection.VersionNonceSent {
return fmt.Errorf("verack message nonce doesn't match (received: %v, sent: %v)",
verackMsg.Nonce, connection.VersionNonceSent)
verackMsg.NonceReceived, connection.VersionNonceSent)
}
return nil
}, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil {
Expand Down
Loading

0 comments on commit de6f7c6

Please sign in to comment.