Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoS ConnectionController #861

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions integration_testing/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func TestSimpleBlockSync(t *testing.T) {
// wait for node1 to sync blocks
waitForNodeToFullySync(node1)

// bridge the nodes together.
bridge := NewConnectionBridge(node1, node2)
require.NoError(bridge.Start())
// TODO: Dial an outbound connection from node2 to node1
// Fix other integration tests.

// wait for node2 to sync blocks.
waitForNodeToFullySync(node2)
Expand Down Expand Up @@ -99,6 +98,7 @@ func TestSimpleSyncRestart(t *testing.T) {
compareNodesByDB(t, node1, node2, 0)
fmt.Println("Random restart successful! Random height was", randomHeight)
fmt.Println("Databases match!")
bridge.Disconnect()
node1.Stop()
node2.Stop()
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {

randomHeight := randomUint32Between(t, 10, config2.MaxSyncBlockHeight)
fmt.Println("Random height for a restart (re-use if test failed):", randomHeight)
disconnectAtBlockHeight(t, node2, bridge12, randomHeight)
disconnectAtBlockHeight(node2, bridge12, randomHeight)

// bridge the nodes together.
bridge23 := NewConnectionBridge(node2, node3)
Expand All @@ -167,6 +167,8 @@ func TestSimpleSyncDisconnectWithSwitchingToNewPeer(t *testing.T) {
compareNodesByDB(t, node3, node2, 0)
fmt.Println("Random restart successful! Random height was", randomHeight)
fmt.Println("Databases match!")
bridge12.Disconnect()
bridge23.Disconnect()
node1.Stop()
node2.Stop()
node3.Stop()
Expand Down
64 changes: 42 additions & 22 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"
)

// TODO: DEPRECATE
// ConnectionBridge is a bidirectional communication channel between two nodes. A bridge creates a pair of inbound and
// outbound peers for each of the nodes to handle communication. In total, it creates four peers.
//
Expand Down Expand Up @@ -111,13 +112,14 @@ func (bridge *ConnectionBridge) createInboundConnection(node *cmd.Node) *lib.Pee
}

// This channel is redundant in our setting.
messagesFromPeer := make(chan *lib.ServerMessage)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
// Because it is an inbound Peer of the node, it is simultaneously a "fake" outbound Peer of the bridge.
// Hence, we will mark the _isOutbound parameter as "true" in NewPeer.
peer := lib.NewPeer(conn, true, netAddress, true,
10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, true,
netAddress, true, 10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
return peer
}

Expand All @@ -139,27 +141,28 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo
fmt.Println("createOutboundConnection: Got a connection from remote:", conn.RemoteAddr().String(),
"on listener:", ll.Addr().String())

na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), otherNode.Server.GetConnectionManager().AddrMgr,
otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage)
peer := lib.NewPeer(conn, false, na, false,
10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny)
peer.ID = uint64(lib.RandInt64(math.MaxInt64))
addrMgr := addrmgr.New("", net.LookupIP)
na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), addrMgr, otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn,
false, na, false, 10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
bridge.newPeerChan <- peer
//}
}(ll)

// Make the provided node to make an outbound connection to our listener.
netAddress, _ := lib.IPToNetAddr(ll.Addr().String(), addrmgr.New("", net.LookupIP), &lib.DeSoMainnetParams)
fmt.Println("createOutboundConnection: IP:", netAddress.IP, "Port:", netAddress.Port)
go node.Server.GetConnectionManager().ConnectPeer(nil, netAddress)
addrMgr := addrmgr.New("", net.LookupIP)
addr, _ := lib.IPToNetAddr(ll.Addr().String(), addrMgr, node.Params)
go node.Server.GetConnectionManager().DialOutboundConnection(addr, uint64(lib.RandInt64(math.MaxInt64)))
}

// getVersionMessage simulates a version message that the provided node would have sent.
func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVersion {
ver := lib.NewMessage(lib.MsgTypeVersion).(*lib.MsgDeSoVersion)
ver.Version = node.Params.ProtocolVersion
ver.Version = node.Params.ProtocolVersion.ToUint64()
ver.TstampSecs = time.Now().Unix()
ver.Nonce = uint64(lib.RandInt64(math.MaxInt64))
ver.UserAgent = node.Params.UserAgent
Expand All @@ -172,12 +175,29 @@ func (bridge *ConnectionBridge) getVersionMessage(node *cmd.Node) *lib.MsgDeSoVe
}

if node.Server != nil {
ver.LatestBlockHeight = uint32(node.Server.GetBlockchain().BlockTip().Header.Height)
ver.LatestBlockHeight = node.Server.GetBlockchain().BlockTip().Header.Height
}
ver.MinFeeRateNanosPerKB = node.Config.MinFeerate
return ver
}

func ReadWithTimeout(readFunc func() error, readTimeout time.Duration) error {
errChan := make(chan error)
go func() {
errChan <- readFunc()
}()
select {
case err := <-errChan:
{
return err
}
case <-time.After(readTimeout):
{
return fmt.Errorf("ReadWithTimeout: Timed out reading message")
}
}
}

// startConnection starts the connection by performing version and verack exchange with
// the provided connection, pretending to be the otherNode.
func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode *cmd.Node) error {
Expand All @@ -192,7 +212,7 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
}

// Wait for a response to the version message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -215,15 +235,15 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode

// Now prepare the verack message.
verackMsg := lib.NewMessage(lib.MsgTypeVerack)
verackMsg.(*lib.MsgDeSoVerack).Nonce = connection.VersionNonceReceived
verackMsg.(*lib.MsgDeSoVerack).NonceReceived = connection.VersionNonceReceived

// And send it to the connection.
if err := connection.WriteDeSoMessage(verackMsg); err != nil {
return err
}

// And finally wait for connection's response to the verack message.
if err := connection.ReadWithTimeout(
if err := ReadWithTimeout(
func() error {
msg, err := connection.ReadDeSoMessage()
if err != nil {
Expand All @@ -234,9 +254,9 @@ func (bridge *ConnectionBridge) startConnection(connection *lib.Peer, otherNode
return fmt.Errorf("message is not verack! Type: %v", msg.GetMsgType())
}
verackMsg := msg.(*lib.MsgDeSoVerack)
if verackMsg.Nonce != connection.VersionNonceSent {
if verackMsg.NonceReceived != connection.VersionNonceSent {
return fmt.Errorf("verack message nonce doesn't match (received: %v, sent: %v)",
verackMsg.Nonce, connection.VersionNonceSent)
verackMsg.NonceReceived, connection.VersionNonceSent)
}
return nil
}, lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil {
Expand Down
Loading
Loading