forked from billglover/golang-etl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
116 lines (93 loc) · 2.17 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"encoding/csv"
"fmt"
"os"
"strconv"
"sync"
"time"
)
func main() {
start := time.Now()
eChan := make(chan *Order)
tChan := make(chan *Order)
done := make(chan bool)
go extract(eChan)
go transform(eChan, tChan)
go load(tChan, done)
<-done
fmt.Println(time.Since(start))
}
// Product holds details of a product
type Product struct {
PartNumber string
UnitCost float64
UnitPrice float64
}
// Order holds details of a customer order
type Order struct {
CustomerNumber int
PartNumber string
Quantity int
UnitCost float64
UnitPrice float64
}
func extract(eChan chan *Order) {
f, _ := os.Open("./orders.txt")
defer f.Close()
r := csv.NewReader(f)
for record, err := r.Read(); err == nil; record, err = r.Read() {
o := new(Order)
o.CustomerNumber, _ = strconv.Atoi(record[0])
o.PartNumber = record[1]
o.Quantity, _ = strconv.Atoi(record[2])
eChan <- o
}
close(eChan)
}
func transform(eChan, tChan chan *Order) {
f, _ := os.Open("./productList.txt")
defer f.Close()
r := csv.NewReader(f)
records, _ := r.ReadAll()
productList := make(map[string]*Product)
for _, r := range records {
p := new(Product)
p.PartNumber = r[0]
p.UnitCost, _ = strconv.ParseFloat(r[1], 64)
p.UnitPrice, _ = strconv.ParseFloat(r[2], 64)
productList[p.PartNumber] = p
}
var wg sync.WaitGroup
for o := range eChan {
wg.Add(1)
go func(o *Order) {
time.Sleep(3 * time.Millisecond)
o.UnitCost = productList[o.PartNumber].UnitCost
o.UnitPrice = productList[o.PartNumber].UnitPrice
tChan <- o
defer wg.Done()
}(o)
}
wg.Wait()
close(tChan)
}
func load(tChan chan *Order, done chan bool) {
f, _ := os.Create("./dest.txt")
defer f.Close()
fmt.Fprintf(f, "%20s%16s%13s%13s%16s%16s", "Part Number", "Quantity",
"Unit Cost", "Unit Price", "Total Cost", "Total Price\n")
var wg sync.WaitGroup
for o := range tChan {
wg.Add(1)
go func(o *Order) {
time.Sleep(1 * time.Millisecond)
fmt.Fprintf(f, "%20s %15d %12.2f %12.2f %15.2f%15.2f\n",
o.PartNumber, o.Quantity, o.UnitCost, o.UnitPrice,
o.UnitCost*float64(o.Quantity), o.UnitPrice*float64(o.UnitPrice))
defer wg.Done()
}(o)
}
wg.Wait()
done <- true
}