forked from Surfline/badgerutils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwriter_test.go
98 lines (86 loc) · 2.26 KB
/
writer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package badgerutils
import (
"bytes"
"encoding/gob"
"fmt"
"github.com/dgraph-io/badger"
"github.com/stretchr/testify/require"
"io/ioutil"
"os"
"strings"
"testing"
)
type sampleRecord struct {
Field1 string
Field2 string
Field3 string
}
func (r sampleRecord) Key() string {
return fmt.Sprintf("%v,%v,%v", r.Field1, r.Field2, r.Field3)
}
func csvToSampleRecord(line string) (Keyed, error) {
values := strings.Split(line, ",")
return sampleRecord{values[0], values[1], values[2]}, nil
}
func readDB(dir string) ([]sampleRecord, error) {
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
defer db.Close()
chkv, cherr := make(chan keyValue), make(chan error)
go func(chan keyValue, chan error) {
err := db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
value, err := item.Value()
if err != nil {
return err
}
kv := keyValue{key, value}
chkv <- kv
}
close(chkv)
return nil
})
cherr <- err
}(chkv, cherr)
sampleRecords := make([]sampleRecord, 0)
for kv := range chkv {
var sr sampleRecord
buf := bytes.NewReader(kv.Value)
if err := gob.NewDecoder(buf).Decode(&sr); err != nil {
return nil, err
}
sampleRecords = append(sampleRecords, sr)
}
if err := <-cherr; err != nil {
return nil, err
}
return sampleRecords, nil
}
func TestWriteInput(t *testing.T) {
dir, err := os.Getwd()
require.Nil(t, err)
tmpDir, err := ioutil.TempDir(dir, "temp")
require.Nil(t, err)
defer os.RemoveAll(tmpDir)
reader := strings.NewReader(`field11,field12,field13
field21,field22,field23
field31,field32,field33`)
err = writeInput(reader, tmpDir, 2, csvToSampleRecord)
require.Nil(t, err)
writtenSampleRecords, err := readDB(tmpDir)
require.Nil(t, err)
require.Equal(t, 3, len(writtenSampleRecords))
require.EqualValues(t, writtenSampleRecords[0], sampleRecord{"field11", "field12", "field13"})
require.EqualValues(t, writtenSampleRecords[1], sampleRecord{"field21", "field22", "field23"})
require.EqualValues(t, writtenSampleRecords[2], sampleRecord{"field31", "field32", "field33"})
}