Skip to content

Commit

Permalink
Use fresh timer every time for F3 backoffs
Browse files Browse the repository at this point in the history
To avoid potential of deadlock in case f3Participator is used from
multiple goroutines use throw-away timers at the price of higher GC.

Also use the cancel function in context explicitly in a unified stop
hook that awaits the participation to end before exiting.
  • Loading branch information
masih committed Oct 7, 2024
1 parent 3d12ca5 commit 47f6258
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"net/http"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -360,7 +361,6 @@ type f3Participator struct {
node v1api.FullNode
participant address.Address
backoff *backoff.Backoff
timer *time.Timer
maxCheckProgressAttempts int
previousTicket api.F3ParticipationTicket
checkProgressInterval time.Duration
Expand All @@ -372,7 +372,6 @@ func newF3Participator(node v1api.FullNode, participant dtypes.MinerAddress, bac
node: node,
participant: address.Address(participant),
backoff: backoff,
timer: time.NewTimer(0),
maxCheckProgressAttempts: maxCheckProgress,
checkProgressInterval: checkProgressInterval,
leaseTerm: leaseTerm,
Expand Down Expand Up @@ -482,23 +481,18 @@ func (p *f3Participator) backOff(ctx context.Context) {
}

func (p *f3Participator) backOffFor(ctx context.Context, d time.Duration) {
if !p.timer.Stop() {
<-p.timer.C
}
p.timer.Reset(d)
// Create a timer every time to avoid potential risk of deadlock or the need for
// mutex despite the fact that f3Participator is never (and should never) be
// called from multiple goroutines.
timer := time.NewTimer(d)
select {
case <-ctx.Done():
return
case <-p.timer.C:
case <-timer.C:
}
}

func (p *f3Participator) stop() {
p.timer.Stop()
}

func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNode, participant dtypes.MinerAddress) error {
ctx := helpers.LifecycleCtx(mctx, lc)
const (
// maxCheckProgressAttempts defines the maximum number of failed attempts
// before we abandon the current lease and restart the participation process.
Expand All @@ -516,8 +510,12 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo
// leaseTerm The number of instances the miner will attempt to lease from nodes.
leaseTerm = 5
)

ctx, cancel := context.WithCancel(mctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

participator := newF3Participator(
node,
participant,
Expand All @@ -530,7 +528,6 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo
checkProgressInterval,
leaseTerm,
)
defer participator.stop()

switch err := participator.participate(ctx); {
case err == nil, errors.Is(err, context.Canceled):
Expand All @@ -539,6 +536,14 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo
log.Errorw("F3 participation stopped abruptly", "err", err)
}
}()

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
cancel()
wg.Wait()
return nil
},
})
return nil
}

Expand Down

0 comments on commit 47f6258

Please sign in to comment.