diff --git a/example/complex-http-crawler/main.go b/example/complex-http-crawler/main.go index 36aed71..fd6b01b 100644 --- a/example/complex-http-crawler/main.go +++ b/example/complex-http-crawler/main.go @@ -5,6 +5,7 @@ import ( "github.com/WangYihang/gojob" "github.com/WangYihang/gojob/example/complex-http-crawler/pkg/model" + "github.com/WangYihang/gojob/pkg/util" "github.com/jessevdk/go-flags" ) @@ -27,7 +28,7 @@ func init() { func main() { scheduler := gojob.NewScheduler(opts.NumWorkers, opts.MaxRuntimePerTaskSeconds, opts.MaxRetries, opts.OutputFilePath) - for line := range gojob.Cat(opts.InputFilePath) { + for line := range util.Cat(opts.InputFilePath) { scheduler.Submit(model.New(string(line))) } scheduler.Start() diff --git a/example/simple-http-crawler/main.go b/example/simple-http-crawler/main.go index 62499ed..4a01ace 100644 --- a/example/simple-http-crawler/main.go +++ b/example/simple-http-crawler/main.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/WangYihang/gojob" + "github.com/WangYihang/gojob/pkg/util" ) type MyTask struct { @@ -29,7 +30,7 @@ func (t *MyTask) Do() error { func main() { scheduler := gojob.NewScheduler(1, 4, 8, "output.txt") - for line := range gojob.Cat("input.txt") { + for line := range util.Cat("input.txt") { scheduler.Submit(New(line)) } scheduler.Wait() diff --git a/gojob.go b/gojob.go index 5a652dd..35b934e 100644 --- a/gojob.go +++ b/gojob.go @@ -1,155 +1,15 @@ package gojob import ( - "bufio" "context" "encoding/json" "log/slog" "os" "path/filepath" - "strings" "sync" "time" ) -// Fanin takes a slice of channels and returns a single channel that -func Fanin[T interface{}](cs []chan T) chan T { - var wg sync.WaitGroup - out := make(chan T) - output := func(c chan T) { - for n := range c { - out <- n - } - wg.Done() - } - wg.Add(len(cs)) - for _, c := range cs { - go output(c) - } - go func() { - wg.Wait() - close(out) - }() - return out -} - -// Fanout takes a channel and returns a slice of channels -// the item in the input channel will be distributed to the output channels -func Fanout[T interface{}](in chan *T, n int) []chan *T { - cs := make([]chan *T, n) - for i := 0; i < n; i++ { - cs[i] = make(chan *T) - go func(c chan *T) { - for n := range in { - c <- n - } - close(c) - }(cs[i]) - } - return cs -} - -// Head takes a channel and returns a channel with the first n items -func Head[T interface{}](in chan T, max int) chan T { - out := make(chan T) - go func() { - defer close(out) - i := 0 - for line := range in { - if i >= max { - break - } - out <- line - i++ - } - }() - return out -} - -// Tail takes a channel and returns a channel with the last n items -func Tail[T interface{}](in chan T, max int) chan T { - out := make(chan T) - go func() { - defer close(out) - var lines []T - for line := range in { - lines = append(lines, line) - if len(lines) > max { - lines = lines[1:] - } - } - for _, line := range lines { - out <- line - } - }() - return out -} - -// Cat takes a file path and returns a channel with the lines of the file -// Spaces are trimmed from the beginning and end of each line -func Cat(filePath string) <-chan string { - out := make(chan string) - - go func() { - defer close(out) // Ensure the channel is closed when the goroutine finishes - - // Open the file - file, err := os.Open(filePath) - if err != nil { - slog.Error("error occured while opening file", slog.String("path", filePath), slog.String("error", err.Error())) - return // Close the channel and exit the goroutine - } - defer file.Close() - - scanner := bufio.NewScanner(file) - for scanner.Scan() { - out <- strings.TrimSpace(scanner.Text()) // Send the line to the channel - } - - // Check for errors during Scan, excluding EOF - if err := scanner.Err(); err != nil { - slog.Error("error occured while reading file", slog.String("path", filePath), slog.String("error", err.Error())) - } - }() - - return out -} - -// Filter takes a channel and returns a channel with the items that pass the filter -func Filter[T interface{}](in chan T, f func(T) bool) chan T { - out := make(chan T) - go func() { - defer close(out) - for line := range in { - if f(line) { - out <- line - } - } - }() - return out -} - -// Map takes a channel and returns a channel with the items that pass the filter -func Map[T interface{}, U interface{}](in chan T, f func(T) U) chan U { - out := make(chan U) - go func() { - defer close(out) - for line := range in { - out <- f(line) - } - }() - return out -} - -// Reduce takes a channel and returns a channel with the items that pass the filter -func Reduce[T interface{}](in chan T, f func(T, T) T) T { - var result T - for line := range in { - result = f(result, line) - } - return result -} - // Task is an interface that defines a task type Task interface { // Do starts the task diff --git a/pkg/util/channel.go b/pkg/util/channel.go new file mode 100644 index 0000000..16386e8 --- /dev/null +++ b/pkg/util/channel.go @@ -0,0 +1,54 @@ +package util + +import "sync" + +// Fanin takes a slice of channels and returns a single channel that +func Fanin[T interface{}](cs []chan T) chan T { + var wg sync.WaitGroup + out := make(chan T) + output := func(c chan T) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + go func() { + wg.Wait() + close(out) + }() + return out +} + +// Fanout takes a channel and returns a slice of channels +// the item in the input channel will be distributed to the output channels +func Fanout[T interface{}](in chan *T, n int) []chan *T { + cs := make([]chan *T, n) + for i := 0; i < n; i++ { + cs[i] = make(chan *T) + go func(c chan *T) { + for n := range in { + c <- n + } + close(c) + }(cs[i]) + } + return cs +} + +// Filter takes a channel and returns a channel with the items that pass the filter +func Filter[T interface{}](in chan T, f func(T) bool) chan T { + out := make(chan T) + go func() { + defer close(out) + for line := range in { + if f(line) { + out <- line + } + } + }() + return out +} diff --git a/pkg/util/io.go b/pkg/util/io.go new file mode 100644 index 0000000..3b7fbe5 --- /dev/null +++ b/pkg/util/io.go @@ -0,0 +1,74 @@ +package util + +import ( + "bufio" + "log/slog" + "os" + "strings" +) + +// Head takes a channel and returns a channel with the first n items +func Head[T interface{}](in chan T, max int) chan T { + out := make(chan T) + go func() { + defer close(out) + i := 0 + for line := range in { + if i >= max { + break + } + out <- line + i++ + } + }() + return out +} + +// Tail takes a channel and returns a channel with the last n items +func Tail[T interface{}](in chan T, max int) chan T { + out := make(chan T) + go func() { + defer close(out) + var lines []T + for line := range in { + lines = append(lines, line) + if len(lines) > max { + lines = lines[1:] + } + } + for _, line := range lines { + out <- line + } + }() + return out +} + +// Cat takes a file path and returns a channel with the lines of the file +// Spaces are trimmed from the beginning and end of each line +func Cat(filePath string) <-chan string { + out := make(chan string) + + go func() { + defer close(out) // Ensure the channel is closed when the goroutine finishes + + // Open the file + file, err := os.Open(filePath) + if err != nil { + slog.Error("error occured while opening file", slog.String("path", filePath), slog.String("error", err.Error())) + return // Close the channel and exit the goroutine + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + out <- strings.TrimSpace(scanner.Text()) // Send the line to the channel + } + + // Check for errors during Scan, excluding EOF + if err := scanner.Err(); err != nil { + slog.Error("error occured while reading file", slog.String("path", filePath), slog.String("error", err.Error())) + } + }() + + return out +} diff --git a/pkg/util/mapreduce.go b/pkg/util/mapreduce.go new file mode 100644 index 0000000..ffc5bf2 --- /dev/null +++ b/pkg/util/mapreduce.go @@ -0,0 +1,22 @@ +package util + +// Map takes a channel and returns a channel with the items that pass the filter +func Map[T interface{}, U interface{}](in chan T, f func(T) U) chan U { + out := make(chan U) + go func() { + defer close(out) + for line := range in { + out <- f(line) + } + }() + return out +} + +// Reduce takes a channel and returns a channel with the items that pass the filter +func Reduce[T interface{}](in chan T, f func(T, T) T) T { + var result T + for line := range in { + result = f(result, line) + } + return result +}