Skip to content

Commit

Permalink
Add HandshakeController
Browse files Browse the repository at this point in the history
PoS Block Producer: TxnConnectStatusByIndex (#672)

* TransactionConnectStatus and ConnectFailingTransaction

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to 960001c.

* Revert "Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions""

This reverts commit 10a147654c5147c28ec674d0650bb54c8d9cebce.

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to a9f7827.

* TransactionConnectStatus and ConnectFailingTransaction

* Initial _connectFailingTransaction

* ConnectFailingTransaction and GlobalParamsEntry updates

* Fix merge conflicts

* gofmt

* Fix merge conflicts

* Fix blockheight

* Fix merge conflicts

* gofmt

* Revert connect failing transaction

* Add TxnStatusConnectedIndex to block and header

* Fix naming

* Fix tests; remove asserts

* Update comment

Integration testing updates

PoS Block Producer: TxnConnectStatusByIndex (#672)

* TransactionConnectStatus and ConnectFailingTransaction

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to 960001c.

* Revert "Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions""

This reverts commit 10a147654c5147c28ec674d0650bb54c8d9cebce.

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to a9f7827.

* TransactionConnectStatus and ConnectFailingTransaction

* Initial _connectFailingTransaction

* ConnectFailingTransaction and GlobalParamsEntry updates

* Fix merge conflicts

* gofmt

* Fix merge conflicts

* Fix blockheight

* Fix merge conflicts

* gofmt

* Revert connect failing transaction

* Add TxnStatusConnectedIndex to block and header

* Fix naming

* Fix tests; remove asserts

* Update comment

RemoteNode and RemoteNodeId

Initial remote node manager tests

remote node tests

Better connection testing framework

Add validator integration test

Fix validator-validator connection test; Add nonValidator-validator test

Simplify indices

Simplify remote node indexer; fix compilation

Simplify RemoteNodeManager

More RemoteNodeManager updates

Nits
  • Loading branch information
AeonSw4n committed Jan 15, 2024
1 parent a86d1c8 commit 92e1d23
Show file tree
Hide file tree
Showing 19 changed files with 1,473 additions and 380 deletions.
9 changes: 5 additions & 4 deletions integration_testing/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ func TestSimpleBlockSync(t *testing.T) {
// wait for node1 to sync blocks
waitForNodeToFullySync(node1)

// bridge the nodes together.
bridge := NewConnectionBridge(node1, node2)
require.NoError(bridge.Start())
// TODO: Dial an outbound connection from node2 to node1

// wait for node2 to sync blocks.
waitForNodeToFullySync(node2)
Expand Down Expand Up @@ -99,6 +97,7 @@ func TestSimpleSyncRestart(t *testing.T) {
compareNodesByDB(t, node1, node2, 0)
fmt.Println("Random restart successful! Random height was", randomHeight)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
Expand Down Expand Up @@ -153,7 +152,7 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {

randomHeight := randomUint32Between(t, 10, config2.MaxSyncBlockHeight)
fmt.Println("Random height for a restart (re-use if test failed):", randomHeight)
disconnectAtBlockHeight(t, node2, bridge12, randomHeight)
disconnectAtBlockHeight(node2, bridge12, randomHeight)

// bridge the nodes together.
bridge23 := NewConnectionBridge(node2, node3)
Expand All @@ -167,6 +166,8 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {
compareNodesByDB(t, node3, node2, 0)
fmt.Println("Random restart successful! Random height was", randomHeight)
fmt.Println("Databases match!")
bridge12.Disconnect()
bridge23.Disconnect()
node1.Stop()
node2.Stop()
node3.Stop()
Expand Down
65 changes: 43 additions & 22 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ func (bridge *ConnectionBridge) createInboundConnection(node *cmd.Node) *lib.Pee
}

// This channel is redundant in our setting.
messagesFromPeer := make(chan *lib.ServerMessage)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
// Because it is an inbound Peer of the node, it is simultaneously a "fake" outbound Peer of the bridge.
// Hence, we will mark the _isOutbound parameter as "true" in NewPeer.
peer := lib.NewPeer(conn, true, netAddress, true,
10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, true,
netAddress, true, 10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
return peer
}

Expand All @@ -139,27 +140,30 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo
fmt.Println("createOutboundConnection: Got a connection from remote:", conn.RemoteAddr().String(),
"on listener:", ll.Addr().String())

na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), otherNode.Server.GetConnectionManager().AddrMgr,
otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage)
peer := lib.NewPeer(conn, false, na, false,
10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
addrMgr := addrmgr.New("", net.LookupIP)
na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), addrMgr, otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn,
false, na, false, 10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
bridge.newPeerChan <- peer
//}
}(ll)

// Make the provided node to make an outbound connection to our listener.
netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams)
fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port)
go node.Server.GetConnectionManager().ConnectPeer(nil, netAddress)
//netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams)
//fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port)
addrMgr := addrmgr.New("", net.LookupIP)
addr, _ := lib.IPToNetAddr(ll.Addr().String(), addrMgr, node.Params)
go node.Server.GetConnectionManager().DialOutboundConnection(addr, uint64(lib.RandInt64(math.MaxInt64)))
}

// getVersionMessage simulates a version message that the provided node would have sent.
func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVersion {
ver := lib.NewMessage(lib.MsgTypeVersion).(*lib.MsgDeSoVersion)
ver.Version = node.Params.ProtocolVersion
ver.Version = node.Params.ProtocolVersion.ToUint64()
ver.TstampSecs = time.Now().Unix()
ver.Nonce = uint64(lib.RandInt64(math.MaxInt64))
ver.UserAgent = node.Params.UserAgent
Expand All @@ -172,12 +176,29 @@ func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVe
}

if node.Server != nil {
ver.LatestBlockHeight = uint32(node.Server.GetBlockchain().BlockTip().Header.Height)
ver.LatestBlockHeight = node.Server.GetBlockchain().BlockTip().Header.Height
}
ver.MinFeeRateNanosPerKB = node.Config.MinFeerate
return ver
}

func ReadWithTimeout(readFunc func() error, readTimeout time.Duration) error {
errChan := make(chan error)
go func() {
errChan <- readFunc()
}()
select {
case err := <-errChan:
{
return err
}
case <-time.After(readTimeout):
{
return fmt.Errorf("ReadWithTimeout: Timed out reading message")
}
}
}

// startConnection starts the connection by performing version and verack exchange with
// the provided connection, pretending to be the otherNode.
func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode *cmd.Node) error {
Expand All @@ -192,7 +213,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
}

// Wait for a response to the version message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -215,15 +236,15 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode

// Now prepare the verack message.
verackMsg := lib.NewMessage(lib.MsgTypeVerack)
verackMsg.(*lib.MsgDeSoVerack).Nonce = connection.VersionNonceReceived
verackMsg.(*lib.MsgDeSoVerack).NonceReceived = connection.VersionNonceReceived

// And send it to the connection.
if err := connection.WriteDeSoMessage(verackMsg); err != nil {
return err
}

// And finally wait for connection's response to the verack message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -234,9 +255,9 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
return fmt.Errorf("message is not verack! Type: %v", msg.GetMsgType())
}
verackMsg := msg.(*lib.MsgDeSoVerack)
if verackMsg.Nonce != connection.VersionNonceSent {
if verackMsg.NonceReceived != connection.VersionNonceSent {
return fmt.Errorf("verack message nonce doesn't match (received: %v, sent: %v)",
verackMsg.Nonce, connection.VersionNonceSent)
verackMsg.NonceReceived, connection.VersionNonceSent)
}
return nil
}, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil {
Expand Down
Loading

0 comments on commit 92e1d23

Please sign in to comment.