-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathparquet_writer.go
104 lines (85 loc) · 2.4 KB
/
parquet_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package deltago
import (
"context"
"github.com/csimplestring/delta-go/action"
"github.com/csimplestring/delta-go/errno"
"github.com/csimplestring/delta-go/internal/util/path"
pq "github.com/fraugster/parquet-go"
"github.com/fraugster/parquet-go/floor/interfaces"
parq "github.com/fraugster/parquet-go/parquet"
"github.com/fraugster/parquet-go/parquetschema"
"github.com/rotisserie/eris"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/gcsblob"
)
type parquetActionWriter interface {
Open(path string, schema string) error
Write(a *action.SingleAction) error
Close() error
}
func newParquetActionWriter(urlstr string) (parquetActionWriter, error) {
blobURL, err := path.ConvertToBlobURL(urlstr)
if err != nil {
return nil, err
}
b, err := blob.OpenBucket(context.Background(), blobURL)
if err != nil {
return nil, err
}
return &defaultParquetActionWriter{
bucket: b,
}, nil
}
// defaultParquetActionWriter uses os.Rename to achive the atomic writes.
type defaultParquetActionWriter struct {
name string
bucket *blob.Bucket
bw *blob.Writer
fw *pq.FileWriter
}
func (l *defaultParquetActionWriter) Open(path string, schemaString string) error {
exists, err := l.bucket.Exists(context.Background(), path)
if err != nil {
return err
}
if exists {
return errno.FileAlreadyExists(path)
}
schema, err := parquetschema.ParseSchemaDefinition(schemaString)
if err != nil {
return eris.Wrap(err, "parsing schema definition")
}
bw, err := l.bucket.NewWriter(context.Background(), path, nil)
if err != nil {
return err
}
fw := pq.NewFileWriter(bw,
pq.WithSchemaDefinition(schema),
pq.WithCompressionCodec(parq.CompressionCodec_SNAPPY))
l.name = path
l.fw = fw
l.bw = bw
return nil
}
func (l *defaultParquetActionWriter) Write(a *action.SingleAction) error {
obj := interfaces.NewMarshallObjectWithSchema(nil, l.fw.GetSchemaDefinition())
am := &actionMarshaller{a: a}
if err := am.MarshalParquet(obj); err != nil {
return err
}
if err := l.fw.AddData(obj.GetData()); err != nil {
return eris.Wrap(err, "local parquet writer writing")
}
return nil
}
func (l *defaultParquetActionWriter) Close() error {
if err := l.fw.Close(); err != nil {
return eris.Wrap(err, "parquet file writer close error")
}
if err := l.bw.Close(); err != nil {
return eris.Wrap(err, "parquet blob writer close error")
}
return nil
}