Skip to content

Commit

Permalink
feat: support load data from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Jul 2, 2024
1 parent f10ebcd commit d154bd3
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 9 deletions.
7 changes: 7 additions & 0 deletions examples/load-s3-file-as-input/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Usage

Use the following path as the input filepath.

```
s3://default/data.json?region=us-west-2&endpoint=s3.amazonaws.com&access_key=********************&secret_key=****************************************
```
19 changes: 19 additions & 0 deletions examples/load-s3-file-as-input/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"log/slog"

"github.com/WangYihang/gojob/pkg/utils"
)

func main() {
for line := range utils.Cat(
"s3://example/shakespeare.txt" +
"?region=us-west-1" +
"&bucket=default" +
"&access_key=********************" +
"&secret_key=********************************************",
) {
slog.Info("s3", slog.String("line", line))
}
}
15 changes: 13 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@ go 1.22
require (
github.com/google/uuid v1.6.0
github.com/jessevdk/go-flags v1.5.0
github.com/minio/minio-go/v7 v7.0.72
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.1
github.com/rabbitmq/amqp091-go v1.10.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/sys v0.17.0 // indirect
github.com/rs/xid v1.5.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
38 changes: 32 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.72 h1:ZSbxs2BfJensLyHdVOgHv+pfmvxYraaUy07ER04dWnA=
github.com/minio/minio-go/v7 v7.0.72/go.mod h1:4yBA8v80xGA30cfM3fz0DKYMXunWl/AV/6tWEs9ryzo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
Expand All @@ -16,12 +33,21 @@ github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSz
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
79 changes: 78 additions & 1 deletion pkg/utils/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package utils

import (
"bufio"
"context"
"fmt"
"io"
"log/slog"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)

// Head takes a channel and returns a channel with the first n items
Expand Down Expand Up @@ -108,7 +114,78 @@ func (wc ReadDiscardCloser) Close() error {
}

func OpenFile(path string) (io.WriteCloser, error) {
return OpenLocalFile(path)
protocol, err := ParseProtocol(path)
if err != nil {
return nil, err
}
switch protocol {
case "s3":
slog.Info("opening s3 file", slog.String("path", path))
return OpenS3File(path)
case "file":
slog.Info("opening local file", slog.String("path", path))
return OpenLocalFile(path)
default:
slog.Warn("unsupported protocol", slog.String("protocol", protocol))
return nil, fmt.Errorf("unsupported protocol: %s", protocol)
}
}

// OpenS3File opens a file from S3
// e.g. s3://default/data.json?region=us-west-1&endpoint=s3.amazonaws.com&access_key=********************&secret_key=****************************************
func OpenS3File(path string) (io.WriteCloser, error) {
// Parse the path
parsed, _ := url.Parse(path)
bucketName := parsed.Host
objectKey := strings.TrimLeft(parsed.Path, "/")
query := parsed.Query()
endpoint := query.Get("endpoint")
if endpoint == "" {
endpoint = "s3.amazonaws.com"
}
accessKey := query.Get("access_key")
secretKey := query.Get("secret_key")
region := query.Get("region")
slog.Info(
"parsed s3 path",
slog.String("access_key", accessKey),
slog.String("secret_key", secretKey),
slog.String("bucket", bucketName),
slog.String("object", objectKey),
slog.String("endpoint", endpoint),
slog.String("region", region),
)

// Download file from S3 into a temporary file
slog.Info("downloading file from s3", slog.String("bucket", bucketName), slog.String("object", objectKey))
s3Client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
Secure: true,
Region: region,
})
if err != nil {
return nil, err
}
reader, err := s3Client.GetObject(context.Background(), bucketName, objectKey, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
defer reader.Close()

fd, err := os.CreateTemp("", "gojob-*")
if err != nil {
return nil, err
}
defer fd.Close()

_, err = io.Copy(fd, reader)
if err != nil {
return nil, err
}

// Open the temporary file
slog.Info("opening local file", slog.String("path", fd.Name()))
return OpenLocalFile(fd.Name())
}

func OpenLocalFile(path string) (io.WriteCloser, error) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/utils/url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package utils

import "net/url"

func ParseProtocol(uri string) (protocol string, err error) {
parsed, err := url.Parse(uri)
return parsed.Scheme, err
}

0 comments on commit d154bd3

Please sign in to comment.