diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index f18f509ac23..961d97aec35 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -59,7 +59,7 @@ func (db *DB) startReserveWorkers( // start eviction worker first as there could be batch expirations because of // initial contract sync - db.reserveWg.Add(1) + db.inFlight.Add(1) go db.evictionWorker(ctx) select { @@ -87,12 +87,12 @@ func (db *DB) startReserveWorkers( // syncing can now begin now that the reserver worker is running db.syncer.Start(ctx) - db.reserveWg.Add(1) + db.inFlight.Add(1) go db.radiusWorker(ctx, wakeUpDur) } func (db *DB) radiusWorker(ctx context.Context, wakeUpDur time.Duration) { - defer db.reserveWg.Done() + defer db.inFlight.Done() radiusWakeUpTicker := time.NewTicker(wakeUpDur) defer radiusWakeUpTicker.Stop() @@ -145,7 +145,7 @@ func (db *DB) removeExpiredBatch(ctx context.Context, batchID []byte) error { } func (db *DB) evictionWorker(ctx context.Context) { - defer db.reserveWg.Done() + defer db.inFlight.Done() batchExpiryTrigger, batchExpiryUnsub := db.events.Subscribe(batchExpiry) defer batchExpiryUnsub() @@ -666,9 +666,9 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan done := make(chan struct{}) errC := make(chan error, 1) - db.reserveWg.Add(1) + db.inFlight.Add(1) go func() { - defer db.reserveWg.Done() + defer db.inFlight.Done() trigger, unsub := db.reserveBinEvents.Subscribe(string(bin)) defer unsub() diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 5b573303e25..13154734ac4 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -37,6 +37,7 @@ import ( localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" + "github.com/ethersphere/bee/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/syndtr/goleveldb/leveldb" @@ -467,7 +468,7 @@ type DB struct { directUploadLimiter chan struct{} reserve *reserve.Reserve - reserveWg sync.WaitGroup + inFlight *util.WaitingCounter reserveBinEvents *events.Subscriber baseAddr swarm.Address batchstore postage.Storer @@ -549,6 +550,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { wakeupDuration: opts.ReserveWakeUpDuration, }, directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes), + inFlight: new(util.WaitingCounter), } if db.validStamp == nil { @@ -595,7 +597,9 @@ func (db *DB) Close() error { bgReserveWorkersClosed := make(chan struct{}) go func() { defer close(bgReserveWorkersClosed) - db.reserveWg.Wait() + if c := db.inFlight.Wait(5 * time.Second); c > 0 { + db.logger.Warning("db shutting down with running goroutines") + } }() bgCacheWorkersClosed := make(chan struct{}) diff --git a/pkg/util/waitingcounter.go b/pkg/util/waitingcounter.go new file mode 100644 index 00000000000..d947c8f1ba8 --- /dev/null +++ b/pkg/util/waitingcounter.go @@ -0,0 +1,46 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package util + +import ( + "sync/atomic" + "time" +) + +// A WaitingCounter waits for a counter to go to zero. +type WaitingCounter atomic.Int32 + +// Add increments the counter. +// As opposed to sync.WaitGroup, it is safe to be called after Wait has been called. +func (r *WaitingCounter) Add(c int32) { + (*atomic.Int32)(r).Add(c) +} + +// Done decrements the counter. +// If the counter goes bellow zero a panic will be raised. +func (r *WaitingCounter) Done() { + if nv := (*atomic.Int32)(r).Add(-1); nv < 0 { + panic("negative counter value") + } +} + +// Wait blocks waiting for the counter value to reach zero or for the timeout to be reached. +// Is guaranteed to wait for at least a hundred milliseconds regardless of given duration if the counter is positive value. +func (r *WaitingCounter) Wait(waitFor time.Duration) int { + deadline := time.Now().Add(waitFor) + for { + c := int((*atomic.Int32)(r).Load()) + + if c <= 0 { // we're done + return c + } + + if time.Now().After(deadline) { // timeout + return c + } + + time.Sleep(100 * time.Millisecond) + } +} diff --git a/pkg/util/waitingcounter_test.go b/pkg/util/waitingcounter_test.go new file mode 100644 index 00000000000..7cdd2ef43ba --- /dev/null +++ b/pkg/util/waitingcounter_test.go @@ -0,0 +1,53 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package util_test + +import ( + "testing" + "time" + + "github.com/ethersphere/bee/pkg/util" +) + +func TestEarlyDone(t *testing.T) { + wc := new(util.WaitingCounter) + + wc.Add(1) + + s := time.Now() + + go func() { + time.Sleep(200 * time.Millisecond) + wc.Done() + }() + + r := wc.Wait(time.Second) + + if r != 0 { + t.Fatal("counter not zero") + } + + if time.Since(s) >= time.Second { + t.Fatal("async done not detected") + } +} + +func TestDeadline(t *testing.T) { + wc := new(util.WaitingCounter) + + wc.Add(1) + + s := time.Now() + + r := wc.Wait(200 * time.Millisecond) + + if r != 1 { + t.Fatal("counter is zero, expected 1") + } + + if time.Since(s) < 200*time.Millisecond { + t.Fatal("expected to wait at least given duration") + } +}