Skip to content

Commit

Permalink
Move mempoolIDs back to mempool/ids.go (cometbft#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanz authored Mar 8, 2023
1 parent 93c0edd commit 24040f5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 115 deletions.
70 changes: 69 additions & 1 deletion mempool/ids.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,71 @@
package mempool

// These functions were moved into v0/reactor.go and v1/reactor.go
import (
"fmt"

cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
)

type mempoolIDs struct {
mtx cmtsync.RWMutex
peerMap map[p2p.ID]uint16
nextID uint16 // assumes that a node will never have over 65536 active peers
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()

curID := ids.nextPeerID()
ids.peerMap[peer.ID()] = curID
ids.activeIDs[curID] = struct{}{}
}

// nextPeerID returns the next unused peer ID to use.
// This assumes that ids's mutex is already locked.
func (ids *mempoolIDs) nextPeerID() uint16 {
if len(ids.activeIDs) == MaxActiveIDs {
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", MaxActiveIDs))
}

_, idExists := ids.activeIDs[ids.nextID]
for idExists {
ids.nextID++
_, idExists = ids.activeIDs[ids.nextID]
}
curID := ids.nextID
ids.nextID++
return curID
}

// Reclaim returns the ID reserved for the peer back to unused pool.
func (ids *mempoolIDs) Reclaim(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()

removedID, ok := ids.peerMap[peer.ID()]
if ok {
delete(ids.activeIDs, removedID)
delete(ids.peerMap, peer.ID())
}
}

// GetForPeer returns an ID reserved for the peer.
func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 {
ids.mtx.RLock()
defer ids.mtx.RUnlock()

return ids.peerMap[peer.ID()]
}

func newMempoolIDs() *mempoolIDs {
return &mempoolIDs{
peerMap: make(map[p2p.ID]uint16),
activeIDs: map[uint16]struct{}{0: {}},
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
}
}
51 changes: 35 additions & 16 deletions mempool/ids_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
package mempool

// import (
// "testing"
import (
"net"
"testing"

// "github.com/stretchr/testify/require"
// "github.com/cometbft/cometbft/types"
// )
"github.com/cometbft/cometbft/p2p/mock"
"github.com/stretchr/testify/assert"
)

// func TestMempoolIDsBasic(t *testing.T) {
// ids := NewMempoolIDs()
func TestMempoolIDsBasic(t *testing.T) {
ids := newMempoolIDs()

// peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
// require.NoError(t, err)
peer := mock.NewPeer(net.IP{127, 0, 0, 1})

// ids.ReserveForPeer(peerID)
// require.EqualValues(t, 1, ids.GetForPeer(peerID))
// ids.Reclaim(peerID)
ids.ReserveForPeer(peer)
assert.EqualValues(t, 1, ids.GetForPeer(peer))
ids.Reclaim(peer)

// ids.ReserveForPeer(peerID)
// require.EqualValues(t, 2, ids.GetForPeer(peerID))
// ids.Reclaim(peerID)
// }
ids.ReserveForPeer(peer)
assert.EqualValues(t, 2, ids.GetForPeer(peer))
ids.Reclaim(peer)
}

func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
if testing.Short() {
return
}

// 0 is already reserved for UnknownPeerID
ids := newMempoolIDs()

for i := 0; i < MaxActiveIDs-1; i++ {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
}

assert.Panics(t, func() {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
})
}
64 changes: 0 additions & 64 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
cfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/libs/clist"
"github.com/cometbft/cometbft/libs/log"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
protomem "github.com/cometbft/cometbft/proto/tendermint/mempool"
"github.com/cometbft/cometbft/types"
Expand All @@ -24,69 +23,6 @@ type Reactor struct {
ids *mempoolIDs
}

type mempoolIDs struct {
mtx cmtsync.RWMutex
peerMap map[p2p.ID]uint16
nextID uint16 // assumes that a node will never have over 65536 active peers
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()

curID := ids.nextPeerID()
ids.peerMap[peer.ID()] = curID
ids.activeIDs[curID] = struct{}{}
}

// nextPeerID returns the next unused peer ID to use.
// This assumes that ids's mutex is already locked.
func (ids *mempoolIDs) nextPeerID() uint16 {
if len(ids.activeIDs) == MaxActiveIDs {
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", MaxActiveIDs))
}

_, idExists := ids.activeIDs[ids.nextID]
for idExists {
ids.nextID++
_, idExists = ids.activeIDs[ids.nextID]
}
curID := ids.nextID
ids.nextID++
return curID
}

// Reclaim returns the ID reserved for the peer back to unused pool.
func (ids *mempoolIDs) Reclaim(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()

removedID, ok := ids.peerMap[peer.ID()]
if ok {
delete(ids.activeIDs, removedID)
delete(ids.peerMap, peer.ID())
}
}

// GetForPeer returns an ID reserved for the peer.
func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 {
ids.mtx.RLock()
defer ids.mtx.RUnlock()

return ids.peerMap[peer.ID()]
}

func newMempoolIDs() *mempoolIDs {
return &mempoolIDs{
peerMap: make(map[p2p.ID]uint16),
activeIDs: map[uint16]struct{}{0: {}},
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
}
}

// NewReactor returns a new Reactor with the given config and mempool.
func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
memR := &Reactor{
Expand Down
34 changes: 0 additions & 34 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mempool
import (
"encoding/hex"
"errors"
"net"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -230,39 +229,6 @@ func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
leaktest.CheckTimeout(t, 10*time.Second)()
}

func TestMempoolIDsBasic(t *testing.T) {
ids := newMempoolIDs()

peer := mock.NewPeer(net.IP{127, 0, 0, 1})

ids.ReserveForPeer(peer)
assert.EqualValues(t, 1, ids.GetForPeer(peer))
ids.Reclaim(peer)

ids.ReserveForPeer(peer)
assert.EqualValues(t, 2, ids.GetForPeer(peer))
ids.Reclaim(peer)
}

func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
if testing.Short() {
return
}

// 0 is already reserved for UnknownPeerID
ids := newMempoolIDs()

for i := 0; i < MaxActiveIDs-1; i++ {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
}

assert.Panics(t, func() {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
})
}

// TODO: This test tests that we don't panic and are able to generate new
// PeerIDs for each peer we add. It seems as though we should be able to test
// this in a much more direct way.
Expand Down

0 comments on commit 24040f5

Please sign in to comment.