From 47f6258c62e5295b92d2fad27fc080bde41d74a8 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 7 Oct 2024 10:57:05 +0100 Subject: [PATCH] Use fresh timer every time for F3 backoffs To avoid potential of deadlock in case f3Participator is used from multiple goroutines use throw-away timers at the price of higher GC. Also use the cancel function in context explicitly in a unified stop hook that awaits the participation to end before exiting. --- node/modules/storageminer.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 61916dc13a..ef0335ac41 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -5,6 +5,7 @@ import ( "errors" "net/http" "strings" + "sync" "time" "github.com/google/uuid" @@ -360,7 +361,6 @@ type f3Participator struct { node v1api.FullNode participant address.Address backoff *backoff.Backoff - timer *time.Timer maxCheckProgressAttempts int previousTicket api.F3ParticipationTicket checkProgressInterval time.Duration @@ -372,7 +372,6 @@ func newF3Participator(node v1api.FullNode, participant dtypes.MinerAddress, bac node: node, participant: address.Address(participant), backoff: backoff, - timer: time.NewTimer(0), maxCheckProgressAttempts: maxCheckProgress, checkProgressInterval: checkProgressInterval, leaseTerm: leaseTerm, @@ -482,23 +481,18 @@ func (p *f3Participator) backOff(ctx context.Context) { } func (p *f3Participator) backOffFor(ctx context.Context, d time.Duration) { - if !p.timer.Stop() { - <-p.timer.C - } - p.timer.Reset(d) + // Create a timer every time to avoid potential risk of deadlock or the need for + // mutex despite the fact that f3Participator is never (and should never) be + // called from multiple goroutines. + timer := time.NewTimer(d) select { case <-ctx.Done(): return - case <-p.timer.C: + case <-timer.C: } } -func (p *f3Participator) stop() { - p.timer.Stop() -} - func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNode, participant dtypes.MinerAddress) error { - ctx := helpers.LifecycleCtx(mctx, lc) const ( // maxCheckProgressAttempts defines the maximum number of failed attempts // before we abandon the current lease and restart the participation process. @@ -516,8 +510,12 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo // leaseTerm The number of instances the miner will attempt to lease from nodes. leaseTerm = 5 ) - + ctx, cancel := context.WithCancel(mctx) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() + participator := newF3Participator( node, participant, @@ -530,7 +528,6 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo checkProgressInterval, leaseTerm, ) - defer participator.stop() switch err := participator.participate(ctx); { case err == nil, errors.Is(err, context.Canceled): @@ -539,6 +536,14 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo log.Errorw("F3 participation stopped abruptly", "err", err) } }() + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + cancel() + wg.Wait() + return nil + }, + }) return nil }