Skip to content

Commit

Permalink
feat(sample): swip21 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Oct 4, 2024
1 parent a8066a2 commit 629e0f6
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 42 deletions.
15 changes: 7 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ func NewBee(
}
}(b)

if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}

reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity

stateStore, stateStoreMetrics, err := InitStateStore(logger, o.DataDir, o.StatestoreCacheCapacity)
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,14 +359,6 @@ func NewBee(
var batchStore postage.Storer = new(postage.NoOpBatchStore)
var evictFn func([]byte) error

var reserveCapacity int

if o.ReserveCapacityDoubling >= 0 && o.ReserveCapacityDoubling <= 1 {
reserveCapacity = 1 << (22 + o.ReserveCapacityDoubling)
} else {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}

if chainEnabled {
batchStore, err = batchstore.New(
stateStore,
Expand Down Expand Up @@ -735,6 +733,7 @@ func NewBee(
lo.ReserveWakeUpDuration = reserveWakeUpDuration
lo.ReserveMinEvictCount = reserveMinEvictCount
lo.RadiusSetter = kad
lo.ReserveCapacityDoubling = o.ReserveCapacityDoubling
}

localStore, err := storer.New(ctx, path, lo)
Expand Down
2 changes: 1 addition & 1 deletion pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) {
if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func TestSalud(t *testing.T) {
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},

// healthy since radius >= most common radius - 1
// healthy since radius >= most common radius - 2
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},

// radius too low
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 6, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},

// dur too long
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (db *DB) cacheWorker(ctx context.Context) {
}

evict := size - capc
if evict < db.opts.cacheMinEvictCount { // evict at least a min count
evict = db.opts.cacheMinEvictCount
if evict < db.reserveOptions.cacheMinEvictCount { // evict at least a min count
evict = db.reserveOptions.cacheMinEvictCount
}

dur := captureDuration(time.Now())
Expand Down
1 change: 0 additions & 1 deletion pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ func (r *Reserve) IterateChunksItems(startBin uint8, cb func(*ChunkBinItem) (boo
PrefixAtStart: true,
}, func(res storage.Result) (bool, error) {
item := res.Entry.(*ChunkBinItem)

stop, err := cb(item)
if stop || err != nil {
return true, err
Expand Down
10 changes: 5 additions & 5 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (db *DB) startReserveWorkers(
go db.reserveWorker(ctx)

select {
case <-time.After(db.opts.reserveWarmupDuration):
case <-time.After(db.reserveOptions.warmupDuration):
case <-db.quit:
return
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (db *DB) reserveWorker(ctx context.Context) {
overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity)
defer overCapUnsub()

thresholdTicker := time.NewTicker(db.opts.reserveWakeupDuration)
thresholdTicker := time.NewTicker(db.reserveOptions.wakeupDuration)
defer thresholdTicker.Stop()

_, _ = db.countWithinRadius(ctx)
Expand Down Expand Up @@ -159,7 +159,7 @@ func (db *DB) reserveWorker(ctx context.Context) {
continue
}

if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.opts.minimumRadius {
if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.reserveOptions.minimumRadius {
radius--
if err := db.reserve.SetRadius(radius); err != nil {
db.logger.Error(err, "reserve set radius")
Expand Down Expand Up @@ -362,8 +362,8 @@ func (db *DB) unreserve(ctx context.Context) (err error) {
}

evict := target - totalEvicted
if evict < int(db.opts.reserveMinEvictCount) { // evict at least a min count
evict = int(db.opts.reserveMinEvictCount)
if evict < int(db.reserveOptions.minEvictCount) { // evict at least a min count
evict = int(db.reserveOptions.minEvictCount)
}

binEvicted, err := db.evictBatch(ctx, b, evict, radius)
Expand Down
14 changes: 12 additions & 2 deletions pkg/storer/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func (db *DB) ReserveSample(

allStats.BatchesBelowValueDuration = time.Since(t)

// If the node has doubled their capacity by some factor, sampling process need to only pertain to the
// chunks of the selected neighborhood as determined by the anchor and the "network" radius and NOT the whole reseve.
// The sampling must select chunk with proximity greater than or equal to the regular network radius.
// The regular network storage radius of the network is the sum of the local radius and the doubling factor.
// For example, the regular radius is 11, but the local node has a doubling factor of 3, so the local radius will eventually drop to 8.
neighborhoodProximity := storageRadius + uint8(db.reserveOptions.capacityDoubling)

// Phase 1: Iterate chunk addresses
g.Go(func() error {
start := time.Now()
Expand All @@ -149,9 +156,12 @@ func (db *DB) ReserveSample(
addStats(stats)
}()

err := db.reserve.IterateChunksItems(storageRadius, func(chi *reserve.ChunkBinItem) (bool, error) {
err := db.reserve.IterateChunksItems(storageRadius, func(ch *reserve.ChunkBinItem) (bool, error) {
if swarm.Proximity(ch.Address.Bytes(), anchor) < neighborhoodProximity {
return false, nil
}
select {
case chunkC <- chi:
case chunkC <- ch:
stats.TotalIterated++
return false, nil
case <-ctx.Done():
Expand Down
135 changes: 130 additions & 5 deletions pkg/storer/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ func TestReserveSampler(t *testing.T) {

var sample1 storer.Sample

var (
radius uint8 = 5
anchor = swarm.RandAddressAt(t, baseAddr, int(radius)).Bytes()
)

t.Run("reserve sample 1", func(t *testing.T) {
sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil)
sample, err := st.ReserveSample(context.TODO(), anchor, radius, timeVar, nil)
if err != nil {
t.Fatal(err)
}

assertValidSample(t, sample)
assertValidSample(t, sample, radius, anchor)
assertSampleNoErrors(t, sample)

if sample.Stats.NewIgnored != 0 {
Expand Down Expand Up @@ -92,7 +97,7 @@ func TestReserveSampler(t *testing.T) {
// Now we generate another sample with the older timestamp. This should give us
// the exact same sample, ensuring that none of the later chunks were considered.
t.Run("reserve sample 2", func(t *testing.T) {
sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil)
sample, err := st.ReserveSample(context.TODO(), anchor, 5, timeVar, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -136,14 +141,131 @@ func TestReserveSampler(t *testing.T) {
})
}

func TestReserveSamplerSisterNeighborhood(t *testing.T) {
const (
chunkCountPerPO = 32
maxPO = 6
networkRadius uint8 = 5
doublingFactor uint8 = 2
localRadius uint8 = networkRadius - doublingFactor
)

randChunks := func(baseAddr swarm.Address, startingRadius int, timeVar uint64) []swarm.Chunk {
var chs []swarm.Chunk
for po := startingRadius; po < maxPO; po++ {
for i := 0; i < chunkCountPerPO; i++ {
ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(3, 2, false)
if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC
ch = chunk.GenerateTestRandomSoChunk(t, ch)
}

// override stamp timestamp to be before the consensus timestamp
ch = ch.WithStamp(postagetesting.MustNewStampWithTimestamp(timeVar))
chs = append(chs, ch)
}
}
return chs
}

testF := func(t *testing.T, baseAddr swarm.Address, st *storer.DB) {
t.Helper()

timeVar := uint64(time.Now().UnixNano())
chs := randChunks(baseAddr, int(localRadius), timeVar)
putter := st.ReservePutter()
for _, ch := range chs {
err := putter.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
}

sisterAnchor := swarm.RandAddressAt(t, baseAddr, int(localRadius))

// chunks belonging to the sister neighborhood
chs = randChunks(sisterAnchor, int(localRadius), timeVar)
putter = st.ReservePutter()
for _, ch := range chs {
err := putter.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
}

t.Run("reserve size", reserveSizeTest(st.Reserve(), chunkCountPerPO*maxPO))

t.Run("reserve sample", func(t *testing.T) {
sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), 0, timeVar, nil)
if err != nil {
t.Fatal(err)
}

assertValidSample(t, sample, doublingFactor, baseAddr.Bytes())
assertSampleNoErrors(t, sample)

if sample.Stats.NewIgnored != 0 {
t.Fatalf("sample should not have ignored chunks")
}
})

t.Run("reserve sample 2", func(t *testing.T) {
sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), localRadius, timeVar, nil)
if err != nil {
t.Fatal(err)
}

assertValidSample(t, sample, localRadius, baseAddr.Bytes())
assertSampleNoErrors(t, sample)

for _, s := range sample.Items {
if got := swarm.Proximity(s.ChunkAddress.Bytes(), baseAddr.Bytes()); got != localRadius {
t.Fatalf("promixity must be exactly %d, got %d", localRadius, got)
}
}

if sample.Stats.NewIgnored != 0 {
t.Fatalf("sample should not have ignored chunks")
}
})

}

t.Run("disk", func(t *testing.T) {
t.Parallel()
baseAddr := swarm.RandAddress(t)
opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second)
opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil }
opts.ReserveCapacityDoubling = 2

storer, err := diskStorer(t, opts)()
if err != nil {
t.Fatal(err)
}
testF(t, baseAddr, storer)
})
t.Run("mem", func(t *testing.T) {
t.Parallel()
baseAddr := swarm.RandAddress(t)
opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second)
opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil }
opts.ReserveCapacityDoubling = 2

storer, err := memStorer(t, opts)()
if err != nil {
t.Fatal(err)
}
testF(t, baseAddr, storer)
})
}

func TestRandSample(t *testing.T) {
t.Parallel()

sample := storer.RandSample(t, nil)
assertValidSample(t, sample)
assertValidSample(t, sample, 0, nil)
}

func assertValidSample(t *testing.T, sample storer.Sample) {
func assertValidSample(t *testing.T, sample storer.Sample, minRadius uint8, anchor []byte) {
t.Helper()

// Assert that sample size is exactly storer.SampleSize
Expand All @@ -165,6 +287,9 @@ func assertValidSample(t *testing.T, sample storer.Sample) {
if item.Stamp == nil {
t.Fatalf("sample item [%d]: stamp should be set", i)
}
if got := swarm.Proximity(item.ChunkAddress.Bytes(), anchor); got < minRadius {
t.Fatalf("sample item [%d]: chunk should have proximity %d with the anchor, got %d", i, minRadius, got)
}
}
for i, item := range sample.Items {
assertSampleItem(item, i)
Expand Down
35 changes: 19 additions & 16 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,10 @@ type Options struct {
RadiusSetter topology.SetStorageRadiuser
StateStore storage.StateStorer

ReserveCapacity int
ReserveWakeUpDuration time.Duration
ReserveMinEvictCount uint64
ReserveCapacity int
ReserveWakeUpDuration time.Duration
ReserveMinEvictCount uint64
ReserveCapacityDoubling int

CacheCapacity uint64
CacheMinEvictCount uint64
Expand Down Expand Up @@ -437,17 +438,18 @@ type DB struct {
validStamp postage.ValidStampFn
setSyncerOnce sync.Once
syncer Syncer
opts workerOpts
reserveOptions reserveOpts

pinIntegrity *PinIntegrity
}

type workerOpts struct {
reserveWarmupDuration time.Duration
reserveWakeupDuration time.Duration
reserveMinEvictCount uint64
cacheMinEvictCount uint64
minimumRadius uint8
type reserveOpts struct {
warmupDuration time.Duration
wakeupDuration time.Duration
minEvictCount uint64
cacheMinEvictCount uint64
minimumRadius uint8
capacityDoubling int
}

// New returns a newly constructed DB object which implements all the above
Expand Down Expand Up @@ -534,12 +536,13 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
validStamp: opts.ValidStamp,
events: events.NewSubscriber(),
reserveBinEvents: events.NewSubscriber(),
opts: workerOpts{
reserveWarmupDuration: opts.WarmupDuration,
reserveWakeupDuration: opts.ReserveWakeUpDuration,
reserveMinEvictCount: opts.ReserveMinEvictCount,
cacheMinEvictCount: opts.CacheMinEvictCount,
minimumRadius: uint8(opts.MinimumStorageRadius),
reserveOptions: reserveOpts{
warmupDuration: opts.WarmupDuration,
wakeupDuration: opts.ReserveWakeUpDuration,
minEvictCount: opts.ReserveMinEvictCount,
cacheMinEvictCount: opts.CacheMinEvictCount,
minimumRadius: uint8(opts.MinimumStorageRadius),
capacityDoubling: opts.ReserveCapacityDoubling,
},
directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes),
pinIntegrity: pinIntegrity,
Expand Down

0 comments on commit 629e0f6

Please sign in to comment.