Skip to content

Commit

Permalink
refactor: seperate channel/io/mapreduce logic into pkg/util
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Feb 27, 2024
1 parent 2155fc5 commit 20d7c1f
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 142 deletions.
3 changes: 2 additions & 1 deletion example/complex-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/example/complex-http-crawler/pkg/model"
"github.com/WangYihang/gojob/pkg/util"
"github.com/jessevdk/go-flags"
)

Expand All @@ -27,7 +28,7 @@ func init() {

func main() {
scheduler := gojob.NewScheduler(opts.NumWorkers, opts.MaxRuntimePerTaskSeconds, opts.MaxRetries, opts.OutputFilePath)
for line := range gojob.Cat(opts.InputFilePath) {
for line := range util.Cat(opts.InputFilePath) {
scheduler.Submit(model.New(string(line)))
}
scheduler.Start()
Expand Down
3 changes: 2 additions & 1 deletion example/simple-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/util"
)

type MyTask struct {
Expand All @@ -29,7 +30,7 @@ func (t *MyTask) Do() error {

func main() {
scheduler := gojob.NewScheduler(1, 4, 8, "output.txt")
for line := range gojob.Cat("input.txt") {
for line := range util.Cat("input.txt") {
scheduler.Submit(New(line))
}
scheduler.Wait()
Expand Down
140 changes: 0 additions & 140 deletions gojob.go
Original file line number Diff line number Diff line change
@@ -1,155 +1,15 @@
package gojob

import (
"bufio"
"context"
"encoding/json"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
)

// Fanin takes a slice of channels and returns a single channel that
func Fanin[T interface{}](cs []chan T) chan T {
var wg sync.WaitGroup
out := make(chan T)
output := func(c chan T) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}

// Fanout takes a channel and returns a slice of channels
// the item in the input channel will be distributed to the output channels
func Fanout[T interface{}](in chan *T, n int) []chan *T {
cs := make([]chan *T, n)
for i := 0; i < n; i++ {
cs[i] = make(chan *T)
go func(c chan *T) {
for n := range in {
c <- n
}
close(c)
}(cs[i])
}
return cs
}

// Head takes a channel and returns a channel with the first n items
func Head[T interface{}](in chan T, max int) chan T {
out := make(chan T)
go func() {
defer close(out)
i := 0
for line := range in {
if i >= max {
break
}
out <- line
i++
}
}()
return out
}

// Tail takes a channel and returns a channel with the last n items
func Tail[T interface{}](in chan T, max int) chan T {
out := make(chan T)
go func() {
defer close(out)
var lines []T
for line := range in {
lines = append(lines, line)
if len(lines) > max {
lines = lines[1:]
}
}
for _, line := range lines {
out <- line
}
}()
return out
}

// Cat takes a file path and returns a channel with the lines of the file
// Spaces are trimmed from the beginning and end of each line
func Cat(filePath string) <-chan string {
out := make(chan string)

go func() {
defer close(out) // Ensure the channel is closed when the goroutine finishes

// Open the file
file, err := os.Open(filePath)
if err != nil {
slog.Error("error occured while opening file", slog.String("path", filePath), slog.String("error", err.Error()))
return // Close the channel and exit the goroutine
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
out <- strings.TrimSpace(scanner.Text()) // Send the line to the channel
}

// Check for errors during Scan, excluding EOF
if err := scanner.Err(); err != nil {
slog.Error("error occured while reading file", slog.String("path", filePath), slog.String("error", err.Error()))
}
}()

return out
}

// Filter takes a channel and returns a channel with the items that pass the filter
func Filter[T interface{}](in chan T, f func(T) bool) chan T {
out := make(chan T)
go func() {
defer close(out)
for line := range in {
if f(line) {
out <- line
}
}
}()
return out
}

// Map takes a channel and returns a channel with the items that pass the filter
func Map[T interface{}, U interface{}](in chan T, f func(T) U) chan U {
out := make(chan U)
go func() {
defer close(out)
for line := range in {
out <- f(line)
}
}()
return out
}

// Reduce takes a channel and returns a channel with the items that pass the filter
func Reduce[T interface{}](in chan T, f func(T, T) T) T {
var result T
for line := range in {
result = f(result, line)
}
return result
}

// Task is an interface that defines a task
type Task interface {
// Do starts the task
Expand Down
54 changes: 54 additions & 0 deletions pkg/util/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package util

import "sync"

// Fanin takes a slice of channels and returns a single channel that
func Fanin[T interface{}](cs []chan T) chan T {
var wg sync.WaitGroup
out := make(chan T)
output := func(c chan T) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}

// Fanout takes a channel and returns a slice of channels
// the item in the input channel will be distributed to the output channels
func Fanout[T interface{}](in chan *T, n int) []chan *T {
cs := make([]chan *T, n)
for i := 0; i < n; i++ {
cs[i] = make(chan *T)
go func(c chan *T) {
for n := range in {
c <- n
}
close(c)
}(cs[i])
}
return cs
}

// Filter takes a channel and returns a channel with the items that pass the filter
func Filter[T interface{}](in chan T, f func(T) bool) chan T {
out := make(chan T)
go func() {
defer close(out)
for line := range in {
if f(line) {
out <- line
}
}
}()
return out
}
74 changes: 74 additions & 0 deletions pkg/util/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package util

import (
"bufio"
"log/slog"
"os"
"strings"
)

// Head takes a channel and returns a channel with the first n items
func Head[T interface{}](in chan T, max int) chan T {
out := make(chan T)
go func() {
defer close(out)
i := 0
for line := range in {
if i >= max {
break
}
out <- line
i++
}
}()
return out
}

// Tail takes a channel and returns a channel with the last n items
func Tail[T interface{}](in chan T, max int) chan T {
out := make(chan T)
go func() {
defer close(out)
var lines []T
for line := range in {
lines = append(lines, line)
if len(lines) > max {
lines = lines[1:]
}
}
for _, line := range lines {
out <- line
}
}()
return out
}

// Cat takes a file path and returns a channel with the lines of the file
// Spaces are trimmed from the beginning and end of each line
func Cat(filePath string) <-chan string {
out := make(chan string)

go func() {
defer close(out) // Ensure the channel is closed when the goroutine finishes

// Open the file
file, err := os.Open(filePath)
if err != nil {
slog.Error("error occured while opening file", slog.String("path", filePath), slog.String("error", err.Error()))
return // Close the channel and exit the goroutine
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
out <- strings.TrimSpace(scanner.Text()) // Send the line to the channel
}

// Check for errors during Scan, excluding EOF
if err := scanner.Err(); err != nil {
slog.Error("error occured while reading file", slog.String("path", filePath), slog.String("error", err.Error()))
}
}()

return out
}
22 changes: 22 additions & 0 deletions pkg/util/mapreduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package util

// Map takes a channel and returns a channel with the items that pass the filter
func Map[T interface{}, U interface{}](in chan T, f func(T) U) chan U {
out := make(chan U)
go func() {
defer close(out)
for line := range in {
out <- f(line)
}
}()
return out
}

// Reduce takes a channel and returns a channel with the items that pass the filter
func Reduce[T interface{}](in chan T, f func(T, T) T) T {
var result T
for line := range in {
result = f(result, line)
}
return result
}

0 comments on commit 20d7c1f

Please sign in to comment.