Skip to content

Commit

Permalink
fix: reserve wg caused data race in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol committed Jul 21, 2023
1 parent 4e924b1 commit 9e20a14
Show file tree
Hide file tree
Showing 3 changed files with 2,376 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
localmigration "github.com/ethersphere/bee/pkg/storer/migration"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"github.com/syndtr/goleveldb/leveldb"
Expand Down Expand Up @@ -467,7 +468,7 @@ type DB struct {
directUploadLimiter chan struct{}

reserve *reserve.Reserve
reserveWg sync.WaitGroup
reserveWg *util.WaitingCounter
reserveBinEvents *events.Subscriber
baseAddr swarm.Address
batchstore postage.Storer
Expand Down Expand Up @@ -549,6 +550,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
wakeupDuration: opts.ReserveWakeUpDuration,
},
directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes),
reserveWg: new(util.WaitingCounter),
}

if db.validStamp == nil {
Expand Down Expand Up @@ -595,7 +597,9 @@ func (db *DB) Close() error {
bgReserveWorkersClosed := make(chan struct{})
go func() {
defer close(bgReserveWorkersClosed)
db.reserveWg.Wait()
if c := db.reserveWg.Wait(5 * time.Second); c > 0 {
db.logger.Warning("db shutting down with running goroutines")
}
}()

bgCacheWorkersClosed := make(chan struct{})
Expand Down
39 changes: 39 additions & 0 deletions pkg/util/waitingcounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package util

import (
"sync/atomic"
"time"
)

type WaitingCounter atomic.Int32

func (r *WaitingCounter) Add(c int32) {
(*atomic.Int32)(r).Add(c)
}

func (r *WaitingCounter) Done() {
r.Add(-1)
}

func (r *WaitingCounter) Wait(waitFor time.Duration) (runningGoroutines int) {
finish := time.Now().Add(waitFor)
for {
runningGoroutines = int((*atomic.Int32)(r).Load())

if runningGoroutines == 0 {
return
} else if runningGoroutines < 0 {
panic("number of running goroutines can not be negative")
}

if time.Now().After(finish) {
return
}

time.Sleep(100 * time.Millisecond)
}
}
Loading

0 comments on commit 9e20a14

Please sign in to comment.