Skip to content

Commit

Permalink
Add HandshakeController
Browse files Browse the repository at this point in the history
PoS Block Producer: TxnConnectStatusByIndex (#672)

* TransactionConnectStatus and ConnectFailingTransaction

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to 960001c.

* Revert "Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions""

This reverts commit 10a147654c5147c28ec674d0650bb54c8d9cebce.

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to a9f7827.

* TransactionConnectStatus and ConnectFailingTransaction

* Initial _connectFailingTransaction

* ConnectFailingTransaction and GlobalParamsEntry updates

* Fix merge conflicts

* gofmt

* Fix merge conflicts

* Fix blockheight

* Fix merge conflicts

* gofmt

* Revert connect failing transaction

* Add TxnStatusConnectedIndex to block and header

* Fix naming

* Fix tests; remove asserts

* Update comment

Integration testing updates

PoS Block Producer: TxnConnectStatusByIndex (#672)

* TransactionConnectStatus and ConnectFailingTransaction

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to 960001c.

* Revert "Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions""

This reverts commit 10a147654c5147c28ec674d0650bb54c8d9cebce.

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to a9f7827.

* TransactionConnectStatus and ConnectFailingTransaction

* Initial _connectFailingTransaction

* ConnectFailingTransaction and GlobalParamsEntry updates

* Fix merge conflicts

* gofmt

* Fix merge conflicts

* Fix blockheight

* Fix merge conflicts

* gofmt

* Revert connect failing transaction

* Add TxnStatusConnectedIndex to block and header

* Fix naming

* Fix tests; remove asserts

* Update comment

RemoteNode and RemoteNodeId

Initial remote node manager tests

remote node tests

Better connection testing framework

Add validator integration test

Fix validator-validator connection test; Add nonValidator-validator test

Simplify indices

Simplify remote node indexer; fix compilation

Simplify RemoteNodeManager

More RemoteNodeManager updates
  • Loading branch information
AeonSw4n committed Jan 10, 2024
1 parent 32393e6 commit fa1a29a
Show file tree
Hide file tree
Showing 14 changed files with 1,169 additions and 347 deletions.
9 changes: 5 additions & 4 deletions integration_testing/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ func TestSimpleBlockSync(t *testing.T) {
// wait for node1 to sync blocks
waitForNodeToFullySync(node1)

// bridge the nodes together.
bridge := NewConnectionBridge(node1, node2)
require.NoError(bridge.Start())
// TODO: Dial an outbound connection from node2 to node1

// wait for node2 to sync blocks.
waitForNodeToFullySync(node2)
Expand Down Expand Up @@ -99,6 +97,7 @@ func TestSimpleSyncRestart(t *testing.T) {
compareNodesByDB(t, node1, node2, 0)
fmt.Println("Random restart successful! Random height was", randomHeight)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
Expand Down Expand Up @@ -153,7 +152,7 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {

randomHeight := randomUint32Between(t, 10, config2.MaxSyncBlockHeight)
fmt.Println("Random height for a restart (re-use if test failed):", randomHeight)
disconnectAtBlockHeight(t, node2, bridge12, randomHeight)
disconnectAtBlockHeight(node2, bridge12, randomHeight)

// bridge the nodes together.
bridge23 := NewConnectionBridge(node2, node3)
Expand All @@ -167,6 +166,8 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {
compareNodesByDB(t, node3, node2, 0)
fmt.Println("Random restart successful! Random height was", randomHeight)
fmt.Println("Databases match!")
bridge12.Disconnect()
bridge23.Disconnect()
node1.Stop()
node2.Stop()
node3.Stop()
Expand Down
65 changes: 43 additions & 22 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ func (bridge *ConnectionBridge) createInboundConnection(node *cmd.Node) *lib.Pee
}

// This channel is redundant in our setting.
messagesFromPeer := make(chan *lib.ServerMessage)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
// Because it is an inbound Peer of the node, it is simultaneously a "fake" outbound Peer of the bridge.
// Hence, we will mark the _isOutbound parameter as "true" in NewPeer.
peer := lib.NewPeer(conn, true, netAddress, true,
10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, true,
netAddress, true, 10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
return peer
}

Expand All @@ -139,27 +140,30 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo
fmt.Println("createOutboundConnection: Got a connection from remote:", conn.RemoteAddr().String(),
"on listener:", ll.Addr().String())

na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), otherNode.Server.GetConnectionManager().AddrMgr,
otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage)
peer := lib.NewPeer(conn, false, na, false,
10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
addrMgr := addrmgr.New("", net.LookupIP)
na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), addrMgr, otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn,
false, na, false, 10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
bridge.newPeerChan <- peer
//}
}(ll)

// Make the provided node to make an outbound connection to our listener.
netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams)
fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port)
go node.Server.GetConnectionManager().ConnectPeer(nil, netAddress)
//netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams)
//fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port)
addrMgr := addrmgr.New("", net.LookupIP)
addr, _ := lib.IPToNetAddr(ll.Addr().String(), addrMgr, node.Params)
go node.Server.GetConnectionManager().DialOutboundConnection(addr, uint64(lib.RandInt64(math.MaxInt64)))
}

// getVersionMessage simulates a version message that the provided node would have sent.
func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVersion {
ver := lib.NewMessage(lib.MsgTypeVersion).(*lib.MsgDeSoVersion)
ver.Version = node.Params.ProtocolVersion
ver.Version = node.Params.ProtocolVersion.ToUint64()
ver.TstampSecs = time.Now().Unix()
ver.Nonce = uint64(lib.RandInt64(math.MaxInt64))
ver.UserAgent = node.Params.UserAgent
Expand All @@ -172,12 +176,29 @@ func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVe
}

if node.Server != nil {
ver.LatestBlockHeight = uint32(node.Server.GetBlockchain().BlockTip().Header.Height)
ver.LatestBlockHeight = node.Server.GetBlockchain().BlockTip().Header.Height
}
ver.MinFeeRateNanosPerKB = node.Config.MinFeerate
return ver
}

func ReadWithTimeout(readFunc func() error, readTimeout time.Duration) error {
errChan := make(chan error)
go func() {
errChan <- readFunc()
}()
select {
case err := <-errChan:
{
return err
}
case <-time.After(readTimeout):
{
return fmt.Errorf("ReadWithTimeout: Timed out reading message")
}
}
}

// startConnection starts the connection by performing version and verack exchange with
// the provided connection, pretending to be the otherNode.
func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode *cmd.Node) error {
Expand All @@ -192,7 +213,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
}

// Wait for a response to the version message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -215,15 +236,15 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode

// Now prepare the verack message.
verackMsg := lib.NewMessage(lib.MsgTypeVerack)
verackMsg.(*lib.MsgDeSoVerack).Nonce = connection.VersionNonceReceived
verackMsg.(*lib.MsgDeSoVerack).NonceReceived = connection.VersionNonceReceived

// And send it to the connection.
if err := connection.WriteDeSoMessage(verackMsg); err != nil {
return err
}

// And finally wait for connection's response to the verack message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -234,9 +255,9 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
return fmt.Errorf("message is not verack! Type: %v", msg.GetMsgType())
}
verackMsg := msg.(*lib.MsgDeSoVerack)
if verackMsg.Nonce != connection.VersionNonceSent {
if verackMsg.NonceReceived != connection.VersionNonceSent {
return fmt.Errorf("verack message nonce doesn't match (received: %v, sent: %v)",
verackMsg.Nonce, connection.VersionNonceSent)
verackMsg.NonceReceived, connection.VersionNonceSent)
}
return nil
}, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions integration_testing/hypersync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestSimpleHyperSync(t *testing.T) {
//compareNodesByDB(t, node1, node2, 0)
compareNodesByChecksum(t, node1, node2)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
Expand Down Expand Up @@ -122,6 +123,8 @@ func TestHyperSyncFromHyperSyncedNode(t *testing.T) {
compareNodesByChecksum(t, node2, node3)

fmt.Println("Databases match!")
bridge12.Disconnect()
bridge23.Disconnect()
node1.Stop()
node2.Stop()
node3.Stop()
Expand Down Expand Up @@ -178,6 +181,7 @@ func TestSimpleHyperSyncRestart(t *testing.T) {
compareNodesByChecksum(t, node1, node2)
fmt.Println("Random restart successful! Random sync prefix was", syncPrefix)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
Expand Down Expand Up @@ -255,6 +259,8 @@ func TestSimpleHyperSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {
compareNodesByChecksum(t, node1, node2)
fmt.Println("Random restart successful! Random sync prefix was", syncPrefix)
fmt.Println("Databases match!")
bridge12.Disconnect()
bridge23.Disconnect()
node1.Stop()
node2.Stop()
node3.Stop()
Expand Down Expand Up @@ -349,6 +355,7 @@ func TestArchivalMode(t *testing.T) {
//compareNodesByDB(t, node1, node2, 0)
compareNodesByChecksum(t, node1, node2)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
Expand Down Expand Up @@ -406,6 +413,9 @@ func TestBlockSyncFromArchivalModeHyperSync(t *testing.T) {
//compareNodesByDB(t, node1, node2, 0)
compareNodesByChecksum(t, node1, node2)
fmt.Println("Databases match!")
bridge12.Disconnect()
bridge23.Disconnect()
node1.Stop()
node2.Stop()
node3.Stop()
}
1 change: 1 addition & 0 deletions integration_testing/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestEncoderMigrations(t *testing.T) {

compareNodesByChecksum(t, node1, node2)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
4 changes: 1 addition & 3 deletions integration_testing/mining_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func TestRegtestMiner(t *testing.T) {

// wait for node1 to sync blocks
mineHeight := uint32(40)
listener := make(chan bool)
listenForBlockHeight(t, node1, mineHeight, listener)
<-listener
<-listenForBlockHeight(node1, mineHeight)

node1.Stop()
}
38 changes: 38 additions & 0 deletions integration_testing/networking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package integration_testing

import (
"github.com/deso-protocol/core/cmd"
"github.com/deso-protocol/core/lib"
"github.com/stretchr/testify/require"
"os"
"testing"
"time"
)

func TestSimpleConnectDisconnect(t *testing.T) {
require := require.New(t)
_ = require

dbDir1 := getDirectory(t)
defer os.RemoveAll(dbDir1)

config1 := generateConfig(t, 18000, dbDir1, 10)
config1.SyncType = lib.NodeSyncTypeBlockSync
config1.MaxSyncBlockHeight = 100
node1 := cmd.NewNode(config1)
node1 = startNode(t, node1)

// TODO: connect node1 to deso-seed-2.io
node1.Server.GetConnectionController().SetTargetOutboundPeers(0)

<-listenForBlockHeight(node1, 50)
// TODO: close connection
time.Sleep(3 * time.Second)
peers := node1.Server.GetConnectionManager().GetAllPeers()
for _, peer := range peers {
if peer.ID == 1 {
t.Fatalf("Should have disconnected from peer 1")
}
}
node1.Stop()
}
Loading

0 comments on commit fa1a29a

Please sign in to comment.