Skip to content

Commit

Permalink
feat: support read from gzip file
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Jul 17, 2024
1 parent 5d61082 commit 811fa9c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
Binary file added examples/read-from-gzip-file/data.txt.gz
Binary file not shown.
41 changes: 41 additions & 0 deletions examples/read-from-gzip-file/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"math/rand"
"time"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/utils"
)

type MyTask struct {
Line string
}

func New(line string) *MyTask {
return &MyTask{
Line: line,
}
}

func (t *MyTask) Do() error {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return nil
}

func main() {
scheduler := gojob.New(
gojob.WithNumWorkers(8),
gojob.WithMaxRetries(4),
gojob.WithMaxRuntimePerTaskSeconds(16),
gojob.WithNumShards(4),
gojob.WithShard(0),
gojob.WithResultFilePath("-"),
gojob.WithStatusFilePath("status.json"),
).
Start()
for line := range utils.Cat("data.txt.gz") {
scheduler.Submit(New(line))
}
scheduler.Wait()
}
16 changes: 14 additions & 2 deletions pkg/utils/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"bufio"
"compress/gzip"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -188,11 +189,12 @@ func OpenS3File(path string) (io.WriteCloser, error) {
return OpenLocalFile(fd.Name())
}

func OpenLocalFile(path string) (io.WriteCloser, error) {
func OpenLocalFile(path string) (io.ReadWriteCloser, error) {
switch path {
case "-":
return ReadDiscardCloser{Writer: os.Stdout, Reader: os.Stdin}, nil
case "":
// bug: input file can not be empty
return ReadDiscardCloser{Writer: io.Discard}, nil
default:
// Create folder
Expand All @@ -201,6 +203,16 @@ func OpenLocalFile(path string) (io.WriteCloser, error) {
return nil, err
}
// Open file
return os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
// Check the file extension, if it is .gz, use gzip.NewReadWriter
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if strings.HasSuffix(path, ".gz") {
gzipFd, err := gzip.NewReader(fd)
if err != nil {
return nil, err
}
return ReadDiscardCloser{Writer: io.Discard, Reader: gzipFd}, nil
} else {
return fd, err
}
}
}

0 comments on commit 811fa9c

Please sign in to comment.