From 6276dfe2ef5f1084698fbc94d539706592d3bdfb Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Wed, 17 Jan 2024 18:15:45 +0800 Subject: [PATCH] feat: add timeout for task.Do --- example/complex-http-crawler/main.go | 3 ++- example/simple-http-crawler/main.go | 2 +- gojob.go | 35 ++++++++++++++++++++++------ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/example/complex-http-crawler/main.go b/example/complex-http-crawler/main.go index dac6e09..56666ef 100644 --- a/example/complex-http-crawler/main.go +++ b/example/complex-http-crawler/main.go @@ -11,6 +11,7 @@ import ( type Options struct { InputFilePath string `short:"i" long:"input" description:"input file path" required:"true"` OutputFilePath string `short:"o" long:"output" description:"output file path" required:"true"` + TimeoutPerTask int `short:"t" long:"timeout" description:"timeout per task in seconds" default:"8"` NumWorkers int `short:"n" long:"num-workers" description:"number of workers" default:"32"` } @@ -24,7 +25,7 @@ func init() { } func main() { - scheduler := gojob.NewScheduler(opts.NumWorkers, opts.OutputFilePath) + scheduler := gojob.NewScheduler(opts.NumWorkers, opts.TimeoutPerTask, opts.OutputFilePath) for line := range gojob.Cat(opts.InputFilePath) { scheduler.Submit(model.New(string(line))) } diff --git a/example/simple-http-crawler/main.go b/example/simple-http-crawler/main.go index 5d5d175..7c60808 100644 --- a/example/simple-http-crawler/main.go +++ b/example/simple-http-crawler/main.go @@ -50,7 +50,7 @@ func (t *MyTask) NeedRetry() bool { } func main() { - scheduler := gojob.NewScheduler(1, "output.txt") + scheduler := gojob.NewScheduler(1, 8, "output.txt") scheduler.Start() for line := range gojob.Cat("input.txt") { scheduler.Submit(New(line)) diff --git a/gojob.go b/gojob.go index 3945249..e6da2dd 100644 --- a/gojob.go +++ b/gojob.go @@ -2,11 +2,13 @@ package gojob import ( "bufio" + "context" "log/slog" "os" "path/filepath" "strings" "sync" + "time" ) // Fanin takes a slice of channels and returns a single channel that @@ -162,6 +164,7 @@ type Task interface { // Scheduler is a task scheduler type Scheduler struct { NumWorkers int + TimeoutSeconds int OutputFilePath string TaskChan chan Task LogChan chan string @@ -170,9 +173,10 @@ type Scheduler struct { } // NewScheduler creates a new scheduler -func NewScheduler(numWorkers int, outputFilePath string) *Scheduler { +func NewScheduler(numWorkers int, timeoutSeconds int, outputFilePath string) *Scheduler { return &Scheduler{ NumWorkers: numWorkers, + TimeoutSeconds: timeoutSeconds, OutputFilePath: outputFilePath, TaskChan: make(chan Task, 1024), LogChan: make(chan string, 1024), @@ -206,14 +210,13 @@ func (s *Scheduler) Wait() { // Worker is a worker func (s *Scheduler) Worker() { for task := range s.TaskChan { - // do task - err := task.Do() + RunWithTimeout(task.Do, time.Duration(s.TimeoutSeconds)*time.Second) // check if retry is needed - if err != nil && task.NeedRetry() { + if task.NeedRetry() { s.taskWg.Add(1) - go func() { - s.TaskChan <- task - }() + go func(t Task) { + s.TaskChan <- t + }(task) } // put log to log channel data, err := task.Bytes() @@ -255,3 +258,21 @@ func (s *Scheduler) Writer() { s.logWg.Done() } } + +func RunWithTimeout(f func() error, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + done := make(chan error, 1) + + go func() { + done <- f() + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-done: + return err + } +}