Skip to content

Commit

Permalink
fix: reserve wg caused data race in tests (#4230)
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol authored Jul 24, 2023
1 parent 4e924b1 commit 35edb36
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 8 deletions.
12 changes: 6 additions & 6 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (db *DB) startReserveWorkers(

// start eviction worker first as there could be batch expirations because of
// initial contract sync
db.reserveWg.Add(1)
db.inFlight.Add(1)
go db.evictionWorker(ctx)

select {
Expand Down Expand Up @@ -87,12 +87,12 @@ func (db *DB) startReserveWorkers(
// syncing can now begin now that the reserver worker is running
db.syncer.Start(ctx)

db.reserveWg.Add(1)
db.inFlight.Add(1)
go db.radiusWorker(ctx, wakeUpDur)
}

func (db *DB) radiusWorker(ctx context.Context, wakeUpDur time.Duration) {
defer db.reserveWg.Done()
defer db.inFlight.Done()

radiusWakeUpTicker := time.NewTicker(wakeUpDur)
defer radiusWakeUpTicker.Stop()
Expand Down Expand Up @@ -145,7 +145,7 @@ func (db *DB) removeExpiredBatch(ctx context.Context, batchID []byte) error {
}

func (db *DB) evictionWorker(ctx context.Context) {
defer db.reserveWg.Done()
defer db.inFlight.Done()

batchExpiryTrigger, batchExpiryUnsub := db.events.Subscribe(batchExpiry)
defer batchExpiryUnsub()
Expand Down Expand Up @@ -666,9 +666,9 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan
done := make(chan struct{})
errC := make(chan error, 1)

db.reserveWg.Add(1)
db.inFlight.Add(1)
go func() {
defer db.reserveWg.Done()
defer db.inFlight.Done()

trigger, unsub := db.reserveBinEvents.Subscribe(string(bin))
defer unsub()
Expand Down
8 changes: 6 additions & 2 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
localmigration "github.com/ethersphere/bee/pkg/storer/migration"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"github.com/syndtr/goleveldb/leveldb"
Expand Down Expand Up @@ -467,7 +468,7 @@ type DB struct {
directUploadLimiter chan struct{}

reserve *reserve.Reserve
reserveWg sync.WaitGroup
inFlight *util.WaitingCounter
reserveBinEvents *events.Subscriber
baseAddr swarm.Address
batchstore postage.Storer
Expand Down Expand Up @@ -549,6 +550,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
wakeupDuration: opts.ReserveWakeUpDuration,
},
directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes),
inFlight: new(util.WaitingCounter),
}

if db.validStamp == nil {
Expand Down Expand Up @@ -595,7 +597,9 @@ func (db *DB) Close() error {
bgReserveWorkersClosed := make(chan struct{})
go func() {
defer close(bgReserveWorkersClosed)
db.reserveWg.Wait()
if c := db.inFlight.Wait(5 * time.Second); c > 0 {
db.logger.Warning("db shutting down with running goroutines")
}
}()

bgCacheWorkersClosed := make(chan struct{})
Expand Down
46 changes: 46 additions & 0 deletions pkg/util/waitingcounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package util

import (
"sync/atomic"
"time"
)

// A WaitingCounter waits for a counter to go to zero.
type WaitingCounter atomic.Int32

// Add increments the counter.
// As opposed to sync.WaitGroup, it is safe to be called after Wait has been called.
func (r *WaitingCounter) Add(c int32) {
(*atomic.Int32)(r).Add(c)
}

// Done decrements the counter.
// If the counter goes bellow zero a panic will be raised.
func (r *WaitingCounter) Done() {
if nv := (*atomic.Int32)(r).Add(-1); nv < 0 {
panic("negative counter value")
}
}

// Wait blocks waiting for the counter value to reach zero or for the timeout to be reached.
// Is guaranteed to wait for at least a hundred milliseconds regardless of given duration if the counter is positive value.
func (r *WaitingCounter) Wait(waitFor time.Duration) int {
deadline := time.Now().Add(waitFor)
for {
c := int((*atomic.Int32)(r).Load())

if c <= 0 { // we're done
return c
}

if time.Now().After(deadline) { // timeout
return c
}

time.Sleep(100 * time.Millisecond)
}
}
53 changes: 53 additions & 0 deletions pkg/util/waitingcounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package util_test

import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/util"
)

func TestEarlyDone(t *testing.T) {
wc := new(util.WaitingCounter)

wc.Add(1)

s := time.Now()

go func() {
time.Sleep(200 * time.Millisecond)
wc.Done()
}()

r := wc.Wait(time.Second)

if r != 0 {
t.Fatal("counter not zero")
}

if time.Since(s) >= time.Second {
t.Fatal("async done not detected")
}
}

func TestDeadline(t *testing.T) {
wc := new(util.WaitingCounter)

wc.Add(1)

s := time.Now()

r := wc.Wait(200 * time.Millisecond)

if r != 1 {
t.Fatal("counter is zero, expected 1")
}

if time.Since(s) < 200*time.Millisecond {
t.Fatal("expected to wait at least given duration")
}
}

0 comments on commit 35edb36

Please sign in to comment.