Skip to content

Commit

Permalink
Add bloom filter metrics to the p2p sdk (#2612)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
ceyonur and StephenButtolph authored Jan 19, 2024
1 parent 18cdb09 commit 9b302a5
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 98 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.10-rc.5
github.com/ava-labs/coreth v0.12.10-wip-bloom-metrics
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.10-rc.5 h1:FMVvXHssvMQ3Eade7i85Wsx9tuD3kUOFMG8ktHeDTp8=
github.com/ava-labs/coreth v0.12.10-rc.5/go.mod h1:a58HbIBc9jscGc3aL8e7JuG8RfhBBOm63waq1q0YM+U=
github.com/ava-labs/coreth v0.12.10-wip-bloom-metrics h1:uSISbdHpLVlTkwUUqVonMgdX91ePbgP/qJ7iHup9xS4=
github.com/ava-labs/coreth v0.12.10-wip-bloom-metrics/go.mod h1:bw3pxnF+IBw5SoxAKujUiZPbPimieJHPd9fQWZZ/MOM=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
53 changes: 11 additions & 42 deletions network/ip_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ava-labs/avalanchego/utils/ips"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/math"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/sampler"
"github.com/ava-labs/avalanchego/utils/set"
)
Expand All @@ -42,6 +43,11 @@ func newIPTracker(
namespace string,
registerer prometheus.Registerer,
) (*ipTracker, error) {
bloomNamespace := metric.AppendNamespace(namespace, "ip_bloom")
bloomMetrics, err := bloom.NewMetrics(bloomNamespace, registerer)
if err != nil {
return nil, err
}
tracker := &ipTracker{
log: log,
numValidatorIPs: prometheus.NewGauge(prometheus.GaugeOpts{
Expand All @@ -54,44 +60,15 @@ func newIPTracker(
Name: "gossipable_ips",
Help: "Number of IPs this node is willing to gossip",
}),
bloomCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "ip_bloom_count",
Help: "Number of IP entries added to the bloom",
}),
bloomNumHashes: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "ip_bloom_hashes",
Help: "Number of hashes in the IP bloom",
}),
bloomNumEntries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "ip_bloom_entries",
Help: "Number of entry slots in the IP bloom",
}),
bloomMaxCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "ip_bloom_max_count",
Help: "Maximum number of IP entries that can be added to the bloom before resetting",
}),
bloomResetCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "ip_bloom_reset_count",
Help: "Number times the IP bloom has been reset",
}),
bloomMetrics: bloomMetrics,
connected: make(map[ids.NodeID]*ips.ClaimedIPPort),
mostRecentValidatorIPs: make(map[ids.NodeID]*ips.ClaimedIPPort),
gossipableIndicies: make(map[ids.NodeID]int),
bloomAdditions: make(map[ids.NodeID]int),
}
err := utils.Err(
err = utils.Err(
registerer.Register(tracker.numValidatorIPs),
registerer.Register(tracker.numGossipable),
registerer.Register(tracker.bloomCount),
registerer.Register(tracker.bloomNumHashes),
registerer.Register(tracker.bloomNumEntries),
registerer.Register(tracker.bloomMaxCount),
registerer.Register(tracker.bloomResetCount),
)
if err != nil {
return nil, err
Expand All @@ -103,11 +80,7 @@ type ipTracker struct {
log logging.Logger
numValidatorIPs prometheus.Gauge
numGossipable prometheus.Gauge
bloomCount prometheus.Gauge
bloomNumHashes prometheus.Gauge
bloomNumEntries prometheus.Gauge
bloomMaxCount prometheus.Gauge
bloomResetCount prometheus.Counter
bloomMetrics *bloom.Metrics

lock sync.RWMutex
// Manually tracked nodes are always treated like validators
Expand Down Expand Up @@ -315,7 +288,7 @@ func (i *ipTracker) updateMostRecentValidatorIP(ip *ips.ClaimedIPPort) {

i.bloomAdditions[ip.NodeID] = oldCount + 1
bloom.Add(i.bloom, ip.GossipID[:], i.bloomSalt)
i.bloomCount.Inc()
i.bloomMetrics.Count.Inc()
}

func (i *ipTracker) addGossipableIP(ip *ips.ClaimedIPPort) {
Expand Down Expand Up @@ -428,10 +401,6 @@ func (i *ipTracker) resetBloom() error {
bloom.Add(newFilter, ip.GossipID[:], newSalt)
i.bloomAdditions[nodeID] = 1
}
i.bloomCount.Set(float64(len(i.mostRecentValidatorIPs)))
i.bloomNumHashes.Set(float64(numHashes))
i.bloomNumEntries.Set(float64(numEntries))
i.bloomMaxCount.Set(float64(i.maxBloomCount))
i.bloomResetCount.Inc()
i.bloomMetrics.Reset(newFilter, i.maxBloomCount)
return nil
}
4 changes: 2 additions & 2 deletions network/ip_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func requireMetricsConsistent(t *testing.T, tracker *ipTracker) {
require := require.New(t)
require.Equal(float64(len(tracker.mostRecentValidatorIPs)), testutil.ToFloat64(tracker.numValidatorIPs))
require.Equal(float64(len(tracker.gossipableIPs)), testutil.ToFloat64(tracker.numGossipable))
require.Equal(float64(tracker.bloom.Count()), testutil.ToFloat64(tracker.bloomCount))
require.Equal(float64(tracker.maxBloomCount), testutil.ToFloat64(tracker.bloomMaxCount))
require.Equal(float64(tracker.bloom.Count()), testutil.ToFloat64(tracker.bloomMetrics.Count))
require.Equal(float64(tracker.maxBloomCount), testutil.ToFloat64(tracker.bloomMetrics.MaxCount))
}

func TestIPTracker_ManuallyTrack(t *testing.T) {
Expand Down
71 changes: 45 additions & 26 deletions network/p2p/gossip/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package gossip
import (
"crypto/rand"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/math"
Expand All @@ -18,36 +20,39 @@ import (
// Invariant: The returned bloom filter is not safe to reset concurrently with
// other operations. However, it is otherwise safe to access concurrently.
func NewBloomFilter(
registerer prometheus.Registerer,
namespace string,
minTargetElements int,
targetFalsePositiveProbability,
resetFalsePositiveProbability float64,
) (*BloomFilter, error) {
numHashes, numEntries := bloom.OptimalParameters(
minTargetElements,
targetFalsePositiveProbability,
)
b, err := bloom.New(numHashes, numEntries)
metrics, err := bloom.NewMetrics(namespace, registerer)
if err != nil {
return nil, err
}

salt, err := randomSalt()
return &BloomFilter{
filter := &BloomFilter{
minTargetElements: minTargetElements,
targetFalsePositiveProbability: targetFalsePositiveProbability,
resetFalsePositiveProbability: resetFalsePositiveProbability,

maxCount: bloom.EstimateCount(numHashes, numEntries, resetFalsePositiveProbability),
bloom: b,
salt: salt,
}, err
metrics: metrics,
}
err = resetBloomFilter(
filter,
minTargetElements,
targetFalsePositiveProbability,
resetFalsePositiveProbability,
)
return filter, err
}

type BloomFilter struct {
minTargetElements int
targetFalsePositiveProbability float64
resetFalsePositiveProbability float64

metrics *bloom.Metrics

maxCount int
bloom *bloom.Filter
// salt is provided to eventually unblock collisions in Bloom. It's possible
Expand All @@ -59,6 +64,7 @@ type BloomFilter struct {
func (b *BloomFilter) Add(gossipable Gossipable) {
h := gossipable.GossipID()
bloom.Add(b.bloom, h[:], b.salt[:])
b.metrics.Count.Inc()
}

func (b *BloomFilter) Has(gossipable Gossipable) bool {
Expand Down Expand Up @@ -88,26 +94,39 @@ func ResetBloomFilterIfNeeded(
return false, nil
}

numHashes, numEntries := bloom.OptimalParameters(
math.Max(bloomFilter.minTargetElements, targetElements),
targetElements = math.Max(bloomFilter.minTargetElements, targetElements)
err := resetBloomFilter(
bloomFilter,
targetElements,
bloomFilter.targetFalsePositiveProbability,
bloomFilter.resetFalsePositiveProbability,
)
return err == nil, err
}

func resetBloomFilter(
bloomFilter *BloomFilter,
targetElements int,
targetFalsePositiveProbability,
resetFalsePositiveProbability float64,
) error {
numHashes, numEntries := bloom.OptimalParameters(
targetElements,
targetFalsePositiveProbability,
)
newBloom, err := bloom.New(numHashes, numEntries)
if err != nil {
return false, err
return err
}
salt, err := randomSalt()
if err != nil {
return false, err
var newSalt ids.ID
if _, err := rand.Read(newSalt[:]); err != nil {
return err
}
bloomFilter.maxCount = bloom.EstimateCount(numHashes, numEntries, bloomFilter.resetFalsePositiveProbability)

bloomFilter.maxCount = bloom.EstimateCount(numHashes, numEntries, resetFalsePositiveProbability)
bloomFilter.bloom = newBloom
bloomFilter.salt = salt
return true, nil
}
bloomFilter.salt = newSalt

func randomSalt() (ids.ID, error) {
salt := ids.ID{}
_, err := rand.Read(salt[:])
return salt, err
bloomFilter.metrics.Reset(newBloom, bloomFilter.maxCount)
return nil
}
49 changes: 29 additions & 20 deletions network/p2p/gossip/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (

"golang.org/x/exp/slices"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/bloom"
)

func TestBloomFilterRefresh(t *testing.T) {
Expand All @@ -20,7 +22,7 @@ func TestBloomFilterRefresh(t *testing.T) {
minTargetElements int
targetFalsePositiveProbability float64
resetFalsePositiveProbability float64
reset bool
resetCount uint64
add []*testTx
expected []*testTx
}{
Expand All @@ -29,7 +31,7 @@ func TestBloomFilterRefresh(t *testing.T) {
minTargetElements: 1,
targetFalsePositiveProbability: 0.01,
resetFalsePositiveProbability: 1,
reset: false, // maxCount = 9223372036854775807
resetCount: 0, // maxCount = 9223372036854775807
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
Expand All @@ -46,7 +48,7 @@ func TestBloomFilterRefresh(t *testing.T) {
minTargetElements: 1,
targetFalsePositiveProbability: 0.01,
resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1
reset: true,
resetCount: 1,
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
Expand All @@ -56,43 +58,50 @@ func TestBloomFilterRefresh(t *testing.T) {
{id: ids.ID{2}},
},
},
{
name: "multiple refresh",
minTargetElements: 1,
targetFalsePositiveProbability: 0.01,
resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1
resetCount: 2,
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
{id: ids.ID{2}},
{id: ids.ID{3}},
{id: ids.ID{4}},
},
expected: []*testTx{
{id: ids.ID{4}},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
numHashes, numEntries := bloom.OptimalParameters(
tt.minTargetElements,
tt.targetFalsePositiveProbability,
)
b, err := bloom.New(numHashes, numEntries)
bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", tt.minTargetElements, tt.targetFalsePositiveProbability, tt.resetFalsePositiveProbability)
require.NoError(err)
bloom := BloomFilter{
bloom: b,
maxCount: bloom.EstimateCount(numHashes, numEntries, tt.resetFalsePositiveProbability),
minTargetElements: tt.minTargetElements,
targetFalsePositiveProbability: tt.targetFalsePositiveProbability,
resetFalsePositiveProbability: tt.resetFalsePositiveProbability,
}

var didReset bool
var resetCount uint64
for _, item := range tt.add {
bloomBytes, saltBytes := bloom.Marshal()
initialBloomBytes := slices.Clone(bloomBytes)
initialSaltBytes := slices.Clone(saltBytes)

reset, err := ResetBloomFilterIfNeeded(&bloom, len(tt.add))
reset, err := ResetBloomFilterIfNeeded(bloom, len(tt.add))
require.NoError(err)
if reset {
didReset = reset
resetCount++
}
bloom.Add(item)

require.Equal(initialBloomBytes, bloomBytes)
require.Equal(initialSaltBytes, saltBytes)
}

require.Equal(tt.reset, didReset)
require.Equal(tt.resetCount, resetCount)
require.Equal(float64(tt.resetCount+1), testutil.ToFloat64(bloom.metrics.ResetCount))
for _, expected := range tt.expected {
require.True(bloom.Has(expected))
}
Expand Down
6 changes: 3 additions & 3 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestGossiperGossip(t *testing.T) {
responseNetwork, err := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
require.NoError(err)

responseBloom, err := NewBloomFilter(1000, 0.01, 0.05)
responseBloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05)
require.NoError(err)
responseSet := &testSet{
txs: make(map[ids.ID]*testTx),
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(err)
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))

bloom, err := NewBloomFilter(1000, 0.01, 0.05)
bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05)
require.NoError(err)
requestSet := &testSet{
txs: make(map[ids.ID]*testTx),
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestPushGossipE2E(t *testing.T) {
knownTx := &testTx{id: ids.GenerateTestID()}

log := logging.NoLog{}
bloom, err := NewBloomFilter(100, 0.01, 0.05)
bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 100, 0.01, 0.05)
require.NoError(err)
set := &testSet{
txs: make(map[ids.ID]*testTx),
Expand Down
Loading

0 comments on commit 9b302a5

Please sign in to comment.