-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpeerStore.go
148 lines (132 loc) · 3.54 KB
/
peerStore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package p2p
import (
"fmt"
"sync"
)
// PeerStore holds active Peers, managing them in a concurrency safe
// manner and providing lookup via various functions
type PeerStore struct {
mtx sync.RWMutex
peers map[string]*Peer // hash -> peer
connected map[string]int // (ip|ip:port) -> count
curSlice []*Peer // temporary slice that gets reset when changes are made
incoming int
outgoing int
}
// NewPeerStore initializes a new peer store
func NewPeerStore() *PeerStore {
ps := new(PeerStore)
ps.peers = make(map[string]*Peer)
ps.connected = make(map[string]int)
return ps
}
// Add a peer to be managed. Returns an error if a peer with that hash
// is already tracked
func (ps *PeerStore) Add(p *Peer) error {
if p == nil {
return fmt.Errorf("trying to add nil")
}
ps.mtx.Lock()
defer ps.mtx.Unlock()
if _, ok := ps.peers[p.Hash]; ok {
return fmt.Errorf("peer already exists")
}
ps.curSlice = nil
ps.peers[p.Hash] = p
ps.connected[p.Endpoint.IP]++
ps.connected[p.Endpoint.String()]++
if p.IsIncoming {
ps.incoming++
} else {
ps.outgoing++
}
return nil
}
// Remove a specific peer if it exists. This checks by pointer reference and not by hash.
// If you have two distinct peer instances (A and B) with the same hash and add A, removing B has no
// effect, even if they have the same values
func (ps *PeerStore) Remove(p *Peer) {
if p == nil {
return
}
ps.mtx.Lock()
defer ps.mtx.Unlock()
if old, ok := ps.peers[p.Hash]; ok && old == p { // pointer comparison
ps.connected[p.Endpoint.IP]--
if ps.connected[p.Endpoint.IP] == 0 {
delete(ps.connected, p.Endpoint.IP)
}
ps.connected[p.Endpoint.String()]--
if ps.connected[p.Endpoint.String()] == 0 {
delete(ps.connected, p.Endpoint.String())
}
if old.IsIncoming {
ps.incoming--
} else {
ps.outgoing--
}
ps.curSlice = nil
delete(ps.peers, p.Hash)
}
}
// Total amount of peers connected
func (ps *PeerStore) Total() int {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return len(ps.peers)
}
// Outgoing is the amount of outgoing peers connected
func (ps *PeerStore) Outgoing() int {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.outgoing
}
// Incoming is the amount of incoming peers connected
func (ps *PeerStore) Incoming() int {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.incoming
}
// Get retrieves a Peer with a specific hash, nil if it doesn't exist
func (ps *PeerStore) Get(hash string) *Peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.peers[hash]
}
// Connections tests whether there is a peer connected from a specified ip address
func (ps *PeerStore) Connections(addr string) int {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.connected[addr]
}
// Connected returns if there is a peer from that specific endpoint
func (ps *PeerStore) Connected(ep Endpoint) bool {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.connected[ep.String()] > 0
}
// Count returns the amount of peers connected from a specified ip address
func (ps *PeerStore) Count(addr string) int {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.connected[addr]
}
// Slice returns a slice of the current peers that is considered concurrency
// safe for reading operations. The slice should not be modified. Peers are randomly
// ordered
func (ps *PeerStore) Slice() []*Peer {
ps.mtx.RLock()
if ps.curSlice != nil {
defer ps.mtx.RUnlock()
return append(ps.curSlice[:0:0], ps.curSlice...)
}
ps.mtx.RUnlock()
ps.mtx.Lock()
defer ps.mtx.Unlock()
r := make([]*Peer, 0, len(ps.peers))
for _, p := range ps.peers {
r = append(r, p)
}
ps.curSlice = r
return r
}