Skip to content

Commit

Permalink
bug: alleviate the data race between Release() and Reboot()
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Jun 17, 2024
1 parent 1933478 commit d84b9f5
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
10 changes: 5 additions & 5 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,15 +503,15 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {

func TestRebootDefaultPool(t *testing.T) {
defer Release()
Reboot()
Reboot() // should do nothing inside
var wg sync.WaitGroup
wg.Add(1)
_ = Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
Release()
assert.NoError(t, ReleaseTimeout(time.Second))
assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
Reboot()
wg.Add(1)
Expand All @@ -530,7 +530,7 @@ func TestRebootNewPool(t *testing.T) {
wg.Done()
})
wg.Wait()
p.Release()
assert.NoError(t, p.ReleaseTimeout(time.Second))
assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
p.Reboot()
wg.Add(1)
Expand All @@ -546,7 +546,7 @@ func TestRebootNewPool(t *testing.T) {
wg.Add(1)
_ = p1.Invoke(1)
wg.Wait()
p1.Release()
assert.NoError(t, p1.ReleaseTimeout(time.Second))
assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed")
p1.Reboot()
wg.Add(1)
Expand Down Expand Up @@ -975,7 +975,7 @@ func TestReleaseTimeout(t *testing.T) {
}

func TestDefaultPoolReleaseTimeout(t *testing.T) {
Reboot()
Reboot() // should do nothing inside
for i := 0; i < 5; i++ {
_ = Submit(func() {
time.Sleep(time.Second)
Expand Down
8 changes: 6 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ func (p *Pool) purgeStaleWorkers() {
atomic.StoreInt32(&p.purgeDone, 1)
}()

purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-p.purgeCtx.Done():
case <-purgeCtx.Done():
return
case <-ticker.C:
}
Expand Down Expand Up @@ -344,7 +345,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
}
}

// Reboot reboots a closed pool.
// Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
Expand Down
8 changes: 6 additions & 2 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ func (p *PoolWithFunc) purgeStaleWorkers() {
atomic.StoreInt32(&p.purgeDone, 1)
}()

purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-p.purgeCtx.Done():
case <-purgeCtx.Done():
return
case <-ticker.C:
}
Expand Down Expand Up @@ -308,7 +309,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
}
}

// Reboot reboots a closed pool.
// Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
Expand Down

0 comments on commit d84b9f5

Please sign in to comment.