Skip to content

Commit

Permalink
Improve task execution and testing in goPool
Browse files Browse the repository at this point in the history
- Modified the `executeTaskWithTimeout` function in `worker.go` to stop the task when the context is cancelled.
- Updated the `Wait` function in `gopool.go` to wait until all tasks are completed, not just dispatched.
- Added a new test case `TestGoPoolWithTimeout` in `gopool_test.go` to test the behavior of goPool when a task times out.
- Used the existing `lock` field in `goPool` to protect concurrent access to `workerStack` in `Wait` and `popWorker` functions.

Signed-off-by: Daniel Hu <[email protected]>
  • Loading branch information
daniel-hutao committed Aug 10, 2023
1 parent 8ce152d commit cdfe756
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
12 changes: 10 additions & 2 deletions gopool.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,17 @@ func (p *goPool) AddTask(t task) {
p.taskQueue <- t
}

// Wait waits for all tasks to be dispatched.
// Wait waits for all tasks to be dispatched and completed.
func (p *goPool) Wait() {
for len(p.taskQueue) > 0 {
for {
p.lock.Lock()
workerStackLen := len(p.workerStack)
p.lock.Unlock()

if len(p.taskQueue) == 0 && workerStackLen == len(p.workers) {
break
}

time.Sleep(100 * time.Millisecond)
}
}
Expand Down
23 changes: 23 additions & 0 deletions gopool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,26 @@ func TestGoPoolWithRetry(t *testing.T) {
t.Errorf("Expected task to run %v times, but it ran %v times", retryCount+1, taskRunCount)
}
}

func TestGoPoolWithTimeout(t *testing.T) {
var taskRun int32

pool := NewGoPool(100, WithTimeout(100*time.Millisecond), WithErrorCallback(func(err error) {
if err.Error() != "task timed out" {
t.Errorf("Expected error 'task timed out', but got %v", err)
}
atomic.StoreInt32(&taskRun, 1)
}))
defer pool.Release()

pool.AddTask(func() (interface{}, error) {
time.Sleep(200 * time.Millisecond)
return nil, nil
})

pool.Wait()

if atomic.LoadInt32(&taskRun) == 0 {
t.Errorf("Expected task to run and timeout, but it did not run")
}
}
16 changes: 12 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,25 @@ func (w *worker) executeTaskWithTimeout(t task, pool *goPool) (result interface{
defer cancel()

// Create a channel to receive the result of the task
done := make(chan struct{})
resultChan := make(chan interface{})
errChan := make(chan error)

// Run the task in a separate goroutine
go func() {
result, err = t()
close(done)
res, err := t()
select {
case resultChan <- res:
case errChan <- err:
case <-ctx.Done():
// The context was cancelled, stop the task
return
}
}()

// Wait for the task to finish or for the context to timeout
select {
case <-done:
case result = <-resultChan:
err = <-errChan
// The task finished successfully
return result, err
case <-ctx.Done():
Expand Down

0 comments on commit cdfe756

Please sign in to comment.