From cdfe7563ef553f9d91737da4be0b232986a74ce5 Mon Sep 17 00:00:00 2001 From: Daniel Hu Date: Thu, 10 Aug 2023 17:07:00 +0800 Subject: [PATCH] Improve task execution and testing in goPool - Modified the `executeTaskWithTimeout` function in `worker.go` to stop the task when the context is cancelled. - Updated the `Wait` function in `gopool.go` to wait until all tasks are completed, not just dispatched. - Added a new test case `TestGoPoolWithTimeout` in `gopool_test.go` to test the behavior of goPool when a task times out. - Used the existing `lock` field in `goPool` to protect concurrent access to `workerStack` in `Wait` and `popWorker` functions. Signed-off-by: Daniel Hu --- gopool.go | 12 ++++++++++-- gopool_test.go | 23 +++++++++++++++++++++++ worker.go | 16 ++++++++++++---- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/gopool.go b/gopool.go index fe32d6a..068fb15 100644 --- a/gopool.go +++ b/gopool.go @@ -84,9 +84,17 @@ func (p *goPool) AddTask(t task) { p.taskQueue <- t } -// Wait waits for all tasks to be dispatched. +// Wait waits for all tasks to be dispatched and completed. func (p *goPool) Wait() { - for len(p.taskQueue) > 0 { + for { + p.lock.Lock() + workerStackLen := len(p.workerStack) + p.lock.Unlock() + + if len(p.taskQueue) == 0 && workerStackLen == len(p.workers) { + break + } + time.Sleep(100 * time.Millisecond) } } diff --git a/gopool_test.go b/gopool_test.go index f9a0c38..b18bd7e 100644 --- a/gopool_test.go +++ b/gopool_test.go @@ -149,3 +149,26 @@ func TestGoPoolWithRetry(t *testing.T) { t.Errorf("Expected task to run %v times, but it ran %v times", retryCount+1, taskRunCount) } } + +func TestGoPoolWithTimeout(t *testing.T) { + var taskRun int32 + + pool := NewGoPool(100, WithTimeout(100*time.Millisecond), WithErrorCallback(func(err error) { + if err.Error() != "task timed out" { + t.Errorf("Expected error 'task timed out', but got %v", err) + } + atomic.StoreInt32(&taskRun, 1) + })) + defer pool.Release() + + pool.AddTask(func() (interface{}, error) { + time.Sleep(200 * time.Millisecond) + return nil, nil + }) + + pool.Wait() + + if atomic.LoadInt32(&taskRun) == 0 { + t.Errorf("Expected task to run and timeout, but it did not run") + } +} diff --git a/worker.go b/worker.go index 999288c..ce69d34 100644 --- a/worker.go +++ b/worker.go @@ -54,17 +54,25 @@ func (w *worker) executeTaskWithTimeout(t task, pool *goPool) (result interface{ defer cancel() // Create a channel to receive the result of the task - done := make(chan struct{}) + resultChan := make(chan interface{}) + errChan := make(chan error) // Run the task in a separate goroutine go func() { - result, err = t() - close(done) + res, err := t() + select { + case resultChan <- res: + case errChan <- err: + case <-ctx.Done(): + // The context was cancelled, stop the task + return + } }() // Wait for the task to finish or for the context to timeout select { - case <-done: + case result = <-resultChan: + err = <-errChan // The task finished successfully return result, err case <-ctx.Done():