Skip to content

Commit

Permalink
feat: add timeout for task.Do
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Jan 17, 2024
1 parent 042d340 commit 6276dfe
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
3 changes: 2 additions & 1 deletion example/complex-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type Options struct {
InputFilePath string `short:"i" long:"input" description:"input file path" required:"true"`
OutputFilePath string `short:"o" long:"output" description:"output file path" required:"true"`
TimeoutPerTask int `short:"t" long:"timeout" description:"timeout per task in seconds" default:"8"`
NumWorkers int `short:"n" long:"num-workers" description:"number of workers" default:"32"`
}

Expand All @@ -24,7 +25,7 @@ func init() {
}

func main() {
scheduler := gojob.NewScheduler(opts.NumWorkers, opts.OutputFilePath)
scheduler := gojob.NewScheduler(opts.NumWorkers, opts.TimeoutPerTask, opts.OutputFilePath)
for line := range gojob.Cat(opts.InputFilePath) {
scheduler.Submit(model.New(string(line)))
}
Expand Down
2 changes: 1 addition & 1 deletion example/simple-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (t *MyTask) NeedRetry() bool {
}

func main() {
scheduler := gojob.NewScheduler(1, "output.txt")
scheduler := gojob.NewScheduler(1, 8, "output.txt")
scheduler.Start()
for line := range gojob.Cat("input.txt") {
scheduler.Submit(New(line))
Expand Down
35 changes: 28 additions & 7 deletions gojob.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package gojob

import (
"bufio"
"context"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
)

// Fanin takes a slice of channels and returns a single channel that
Expand Down Expand Up @@ -162,6 +164,7 @@ type Task interface {
// Scheduler is a task scheduler
type Scheduler struct {
NumWorkers int
TimeoutSeconds int
OutputFilePath string
TaskChan chan Task
LogChan chan string
Expand All @@ -170,9 +173,10 @@ type Scheduler struct {
}

// NewScheduler creates a new scheduler
func NewScheduler(numWorkers int, outputFilePath string) *Scheduler {
func NewScheduler(numWorkers int, timeoutSeconds int, outputFilePath string) *Scheduler {
return &Scheduler{
NumWorkers: numWorkers,
TimeoutSeconds: timeoutSeconds,
OutputFilePath: outputFilePath,
TaskChan: make(chan Task, 1024),
LogChan: make(chan string, 1024),
Expand Down Expand Up @@ -206,14 +210,13 @@ func (s *Scheduler) Wait() {
// Worker is a worker
func (s *Scheduler) Worker() {
for task := range s.TaskChan {
// do task
err := task.Do()
RunWithTimeout(task.Do, time.Duration(s.TimeoutSeconds)*time.Second)
// check if retry is needed
if err != nil && task.NeedRetry() {
if task.NeedRetry() {
s.taskWg.Add(1)
go func() {
s.TaskChan <- task
}()
go func(t Task) {
s.TaskChan <- t
}(task)
}
// put log to log channel
data, err := task.Bytes()
Expand Down Expand Up @@ -255,3 +258,21 @@ func (s *Scheduler) Writer() {
s.logWg.Done()
}
}

func RunWithTimeout(f func() error, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

done := make(chan error, 1)

go func() {
done <- f()
}()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}
}

0 comments on commit 6276dfe

Please sign in to comment.