From 8df6152afe35aa2360382a595758ca7d7771d4d1 Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Fri, 28 Jun 2024 18:00:32 +0800 Subject: [PATCH] fix: date race at status.go --- status.go | 50 ++++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/status.go b/status.go index 25abc5b..f744ff7 100644 --- a/status.go +++ b/status.go @@ -2,6 +2,7 @@ package gojob import ( "sync" + "sync/atomic" "time" ) @@ -15,18 +16,21 @@ type Status struct { } type statusManager struct { - numFailed int64 - numSucceed int64 - numTotal int64 + numFailed atomic.Int64 + numSucceed atomic.Int64 + numTotal atomic.Int64 - mutex *sync.Mutex + mutex sync.Mutex ticker *time.Ticker statusChans []chan Status } func newStatusManager() *statusManager { return &statusManager{ - mutex: &sync.Mutex{}, + numFailed: atomic.Int64{}, + numSucceed: atomic.Int64{}, + numTotal: atomic.Int64{}, + mutex: sync.Mutex{}, ticker: time.NewTicker(5 * time.Second), statusChans: []chan Status{}, } @@ -34,9 +38,11 @@ func newStatusManager() *statusManager { func (sm *statusManager) notify() { status := sm.Snapshot() + sm.mutex.Lock() for _, ch := range sm.statusChans { ch <- status } + sm.mutex.Unlock() } // Start starts the status manager. @@ -59,24 +65,18 @@ func (sm *statusManager) Stop() { // IncFailed increments the number of failed jobs. func (sm *statusManager) IncFailed() { - sm.mutex.Lock() - sm.numFailed++ - sm.mutex.Unlock() + sm.numFailed.Add(1) } // IncSucceed increments the number of succeed jobs. func (sm *statusManager) IncSucceed() { - sm.mutex.Lock() - sm.numSucceed++ - sm.mutex.Unlock() + sm.numSucceed.Add(1) } // SetTotal sets the total number of jobs. // It should be called before the job starts. func (sm *statusManager) SetTotal(total int64) { - sm.mutex.Lock() - sm.numTotal = total - sm.mutex.Unlock() + sm.numTotal.Store(total) } // StatusChan returns a channel that will receive the status of the job. @@ -84,22 +84,24 @@ func (sm *statusManager) SetTotal(total int64) { // You can call it multiple times to get multiple channels. func (sm *statusManager) StatusChan() <-chan Status { ch := make(chan Status) + sm.mutex.Lock() sm.statusChans = append(sm.statusChans, ch) + sm.mutex.Unlock() return ch } // Snapshot returns the current status of the job. func (sm *statusManager) Snapshot() Status { - defer func() func() { - sm.mutex.Lock() - return func() { sm.mutex.Unlock() } - }()() - status := Status{ + sm.mutex.Lock() + numFailed := sm.numFailed.Load() + numSucceed := sm.numSucceed.Load() + numTotal := sm.numTotal.Load() + sm.mutex.Unlock() + return Status{ Timestamp: time.Now().Format(time.RFC3339), - NumFailed: sm.numFailed, - NumSucceed: sm.numSucceed, - NumFinished: sm.numFailed + sm.numSucceed, - NumTotal: sm.numTotal, + NumFailed: numFailed, + NumSucceed: numSucceed, + NumFinished: numFailed + numSucceed, + NumTotal: numTotal, } - return status }