Skip to content

Commit

Permalink
PoS Remote Node Indexer and Manager (#879)
Browse files Browse the repository at this point in the history
* Add RemoteNodeIndexer

* Add HandshakeController

PoS Block Producer: TxnConnectStatusByIndex (#672)

* TransactionConnectStatus and ConnectFailingTransaction

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to 960001c.

* Revert "Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions""

This reverts commit 10a147654c5147c28ec674d0650bb54c8d9cebce.

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to a9f7827.

* TransactionConnectStatus and ConnectFailingTransaction

* Initial _connectFailingTransaction

* ConnectFailingTransaction and GlobalParamsEntry updates

* Fix merge conflicts

* gofmt

* Fix merge conflicts

* Fix blockheight

* Fix merge conflicts

* gofmt

* Revert connect failing transaction

* Add TxnStatusConnectedIndex to block and header

* Fix naming

* Fix tests; remove asserts

* Update comment

Integration testing updates

PoS Block Producer: TxnConnectStatusByIndex (#672)

* TransactionConnectStatus and ConnectFailingTransaction

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to 960001c.

* Revert "Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions""

This reverts commit 10a147654c5147c28ec674d0650bb54c8d9cebce.

* Revert "Merge branch 'p/bmf-status-connected' into p/failing-transactions"

This reverts commit d3e543c4c3e6f03cc74087b05c268d4449ba1689, reversing
changes made to a9f7827.

* TransactionConnectStatus and ConnectFailingTransaction

* Initial _connectFailingTransaction

* ConnectFailingTransaction and GlobalParamsEntry updates

* Fix merge conflicts

* gofmt

* Fix merge conflicts

* Fix blockheight

* Fix merge conflicts

* gofmt

* Revert connect failing transaction

* Add TxnStatusConnectedIndex to block and header

* Fix naming

* Fix tests; remove asserts

* Update comment

RemoteNode and RemoteNodeId

Initial remote node manager tests

remote node tests

Better connection testing framework

Add validator integration test

Fix validator-validator connection test; Add nonValidator-validator test

Simplify indices

Simplify remote node indexer; fix compilation

Simplify RemoteNodeManager

More RemoteNodeManager updates

Nits
  • Loading branch information
AeonSw4n authored Jan 10, 2024
1 parent 43d8e36 commit 2eb2632
Show file tree
Hide file tree
Showing 6 changed files with 512 additions and 19 deletions.
69 changes: 69 additions & 0 deletions collections/concurrent_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package collections

import "sync"

type ConcurrentMap[Key comparable, Value any] struct {
mtx sync.RWMutex
m map[Key]Value
}

func NewConcurrentMap[Key comparable, Value any]() *ConcurrentMap[Key, Value] {
return &ConcurrentMap[Key, Value]{
m: make(map[Key]Value),
}
}

func (cm *ConcurrentMap[Key, Value]) Set(key Key, val Value) {
cm.mtx.Lock()
defer cm.mtx.Unlock()

cm.m[key] = val
}

func (cm *ConcurrentMap[Key, Value]) Remove(key Key) {
cm.mtx.Lock()
defer cm.mtx.Unlock()

_, ok := cm.m[key]
if !ok {
return
}
delete(cm.m, key)
}

func (cm *ConcurrentMap[Key, Value]) Get(key Key) (Value, bool) {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

val, ok := cm.m[key]
return val, ok
}

func (cm *ConcurrentMap[Key, Value]) Copy() map[Key]Value {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

index := make(map[Key]Value)
for key, node := range cm.m {
index[key] = node
}
return index
}

func (cm *ConcurrentMap[Key, Value]) GetAll() []Value {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

var vals []Value
for _, val := range cm.m {
vals = append(vals, val)
}
return vals
}

func (cm *ConcurrentMap[Key, Value]) Count() int {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

return len(cm.m)
}
61 changes: 61 additions & 0 deletions collections/concurrent_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package collections

import (
"fmt"
"testing"
)

func TestConcurrentMap(t *testing.T) {
m := NewConcurrentMap[string, int]()
control := make(map[string]int)

// test add
for ii := 0; ii < 100; ii++ {
key := fmt.Sprintf("%v", ii)
m.Set(key, ii)
control[key] = ii
}

for key, val := range control {
if mVal, ok := m.Get(key); !ok || mVal != val {
t.Errorf("Expected %d, got %d", val, m.m[key])
}
}

// test remove
for ii := 0; ii < 50; ii++ {
key := fmt.Sprintf("%v", ii)
m.Remove(key)
delete(control, key)
}

for key, val := range control {
if mVal, ok := m.Get(key); !ok || mVal != val {
t.Errorf("Expected %d, got %d", val, m.m[key])
}
}

// test copy
copy := m.Copy()
for key, val := range control {
if mVal, ok := copy[key]; !ok || mVal != val {
t.Errorf("Expected %d, got %d", val, m.m[key])
}
}
if len(copy) != len(control) {
t.Errorf("Expected %d, got %d", len(control), len(copy))
}

// test get all
vals := m.GetAll()
for _, val := range vals {
if _, ok := control[fmt.Sprintf("%v", val)]; !ok {
t.Errorf("Expected %d, got %d", val, m.m[fmt.Sprintf("%v", val)])
}
}

// test size
if m.Count() != len(control) {
t.Errorf("Expected %d, got %d", len(control), m.Count())
}
}
52 changes: 36 additions & 16 deletions lib/remote_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,19 @@ func (id RemoteNodeId) ToUint64() uint64 {
// lifecycle. Once the RemoteNode is terminated, it will be disposed of, and a new RemoteNode must be created if we
// wish to reconnect to the peer in the future.
type RemoteNode struct {
mtx sync.Mutex
mtx sync.RWMutex

peer *Peer
// The id is the unique identifier of this RemoteNode. For outbound connections, the id will be the same as the
// attemptId of the OutboundConnectionAttempt, and the subsequent id of the outbound peer. For inbound connections,
// the id will be the same as the inbound peer's id.
id RemoteNodeId
id RemoteNodeId
// validatorPublicKey is the BLS public key of the validator node. This is only set for validator nodes. For
// non-validator nodes, this will be nil. For outbound validators nodes, the validatorPublicKey will be set when
// the RemoteNode is instantiated. And for inbound validator nodes, the validatorPublicKey will be set when the
// handshake is completed.
validatorPublicKey *bls.PublicKey

connectionStatus RemoteNodeStatus

params *DeSoParams
Expand Down Expand Up @@ -134,10 +140,11 @@ func NewHandshakeMetadata() *HandshakeMetadata {
}
}

func NewRemoteNode(id RemoteNodeId, srv *Server, cmgr *ConnectionManager, keystore *BLSKeystore,
func NewRemoteNode(id RemoteNodeId, validatorPublicKey *bls.PublicKey, srv *Server, cmgr *ConnectionManager, keystore *BLSKeystore,
params *DeSoParams, minTxFeeRateNanosPerKB uint64, latestBlockHeight uint64, nodeServices ServiceFlag) *RemoteNode {
return &RemoteNode{
id: id,
validatorPublicKey: validatorPublicKey,
connectionStatus: RemoteNodeStatus_NotConnected,
handshakeMetadata: NewHandshakeMetadata(),
srv: srv,
Expand Down Expand Up @@ -183,7 +190,7 @@ func (rn *RemoteNode) GetNegotiatedProtocolVersion() ProtocolVersionType {
}

func (rn *RemoteNode) GetValidatorPublicKey() *bls.PublicKey {
return rn.handshakeMetadata.validatorPublicKey
return rn.validatorPublicKey
}

func (rn *RemoteNode) GetUserAgent() string {
Expand Down Expand Up @@ -231,41 +238,41 @@ func (rn *RemoteNode) IsValidator() bool {

// DialOutboundConnection dials an outbound connection to the provided netAddr.
func (rn *RemoteNode) DialOutboundConnection(netAddr *wire.NetAddress) error {
rn.mtx.Lock()
defer rn.mtx.Unlock()

if !rn.IsNotConnected() {
return fmt.Errorf("RemoteNode.DialOutboundConnection: RemoteNode is not in the NotConnected state")
}

rn.mtx.Lock()
defer rn.mtx.Unlock()

rn.cmgr.DialOutboundConnection(netAddr, rn.GetId().ToUint64())
rn.setStatusAttempted()
return nil
}

// DialPersistentOutboundConnection dials a persistent outbound connection to the provided netAddr.
func (rn *RemoteNode) DialPersistentOutboundConnection(netAddr *wire.NetAddress) error {
rn.mtx.Lock()
defer rn.mtx.Unlock()

if !rn.IsNotConnected() {
return fmt.Errorf("RemoteNode.DialPersistentOutboundConnection: RemoteNode is not in the NotConnected state")
}

rn.mtx.Lock()
defer rn.mtx.Unlock()

rn.cmgr.DialPersistentOutboundConnection(netAddr, rn.GetId().ToUint64())
rn.setStatusAttempted()
return nil
}

// AttachInboundConnection creates an inbound peer once a successful inbound connection has been established.
func (rn *RemoteNode) AttachInboundConnection(conn net.Conn, na *wire.NetAddress) error {
rn.mtx.Lock()
defer rn.mtx.Unlock()

if !rn.IsNotConnected() {
return fmt.Errorf("RemoteNode.AttachInboundConnection: RemoteNode is not in the NotConnected state")
}

rn.mtx.Lock()
defer rn.mtx.Unlock()

id := rn.GetId().ToUint64()
rn.peer = rn.cmgr.ConnectPeer(id, conn, na, false, false)
rn.setStatusConnected()
Expand All @@ -274,13 +281,13 @@ func (rn *RemoteNode) AttachInboundConnection(conn net.Conn, na *wire.NetAddress

// AttachOutboundConnection creates an outbound peer once a successful outbound connection has been established.
func (rn *RemoteNode) AttachOutboundConnection(conn net.Conn, na *wire.NetAddress, isPersistent bool) error {
rn.mtx.Lock()
defer rn.mtx.Unlock()

if rn.connectionStatus != RemoteNodeStatus_Attempted {
return fmt.Errorf("RemoteNode.AttachOutboundConnection: RemoteNode is not in the Attempted state")
}

rn.mtx.Lock()
defer rn.mtx.Unlock()

id := rn.GetId().ToUint64()
rn.peer = rn.cmgr.ConnectPeer(id, conn, na, true, isPersistent)
rn.setStatusConnected()
Expand All @@ -292,6 +299,10 @@ func (rn *RemoteNode) Disconnect() {
rn.mtx.Lock()
defer rn.mtx.Unlock()

if rn.connectionStatus == RemoteNodeStatus_Terminated {
return
}

id := rn.GetId().ToUint64()
switch rn.connectionStatus {
case RemoteNodeStatus_Attempted:
Expand All @@ -303,6 +314,9 @@ func (rn *RemoteNode) Disconnect() {
}

func (rn *RemoteNode) SendMessage(desoMsg DeSoMessage) error {
rn.mtx.RLock()
rn.mtx.RUnlock()

if rn.connectionStatus != RemoteNodeStatus_HandshakeCompleted {
return fmt.Errorf("SendMessage: Remote node is not connected")
}
Expand Down Expand Up @@ -614,8 +628,14 @@ func (rn *RemoteNode) validateVerackPoS(vrkMsg *MsgDeSoVerack) error {
"verack signature verification failed", rn.id)
}

if rn.validatorPublicKey != nil || rn.validatorPublicKey.Serialize() != vrkMsg.PublicKey.Serialize() {
return fmt.Errorf("RemoteNode.validateVerackPoS: Requesting disconnect for id: (%v) "+
"verack public key mismatch; message: %v; expected: %v", rn.id, vrkMsg.PublicKey, rn.validatorPublicKey)
}

// If we get here then the verack message is valid. Set the validator public key on the peer.
vMeta.validatorPublicKey = vrkMsg.PublicKey
rn.validatorPublicKey = vrkMsg.PublicKey
return nil
}

Expand Down
46 changes: 46 additions & 0 deletions lib/remote_node_indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package lib

import (
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/core/collections"
)

// RemoteNodeIndexer is a structure that holds information about all remote nodes and their indices.
type RemoteNodeIndexer struct {
// AllRemoteNodes is a map storing all remote nodes by their IDs.
AllRemoteNodes *collections.ConcurrentMap[RemoteNodeId, *RemoteNode]

// Indices for various types of remote nodes.
ValidatorIndex *collections.ConcurrentMap[bls.SerializedPublicKey, *RemoteNode]
NonValidatorOutboundIndex *collections.ConcurrentMap[RemoteNodeId, *RemoteNode]
NonValidatorInboundIndex *collections.ConcurrentMap[RemoteNodeId, *RemoteNode]
}

// NewRemoteNodeIndexer initializes and returns a new instance of RemoteNodeIndexer.
func NewRemoteNodeIndexer() *RemoteNodeIndexer {
rni := &RemoteNodeIndexer{
AllRemoteNodes: collections.NewConcurrentMap[RemoteNodeId, *RemoteNode](),
ValidatorIndex: collections.NewConcurrentMap[bls.SerializedPublicKey, *RemoteNode](),
NonValidatorOutboundIndex: collections.NewConcurrentMap[RemoteNodeId, *RemoteNode](),
NonValidatorInboundIndex: collections.NewConcurrentMap[RemoteNodeId, *RemoteNode](),
}

return rni
}

// Getter methods for accessing the different indices.
func (rni *RemoteNodeIndexer) GetAllRemoteNodes() *collections.ConcurrentMap[RemoteNodeId, *RemoteNode] {
return rni.AllRemoteNodes
}

func (rni *RemoteNodeIndexer) GetValidatorIndex() *collections.ConcurrentMap[bls.SerializedPublicKey, *RemoteNode] {
return rni.ValidatorIndex
}

func (rni *RemoteNodeIndexer) GetNonValidatorOutboundIndex() *collections.ConcurrentMap[RemoteNodeId, *RemoteNode] {
return rni.NonValidatorOutboundIndex
}

func (rni *RemoteNodeIndexer) GetNonValidatorInboundIndex() *collections.ConcurrentMap[RemoteNodeId, *RemoteNode] {
return rni.NonValidatorInboundIndex
}
Loading

0 comments on commit 2eb2632

Please sign in to comment.