-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.go
156 lines (130 loc) · 2.94 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package pipeline
import (
"log"
"os"
"sync"
)
// Pipeline pipeline
type Pipeline struct {
entry chan interface{}
workspaces []Workspace
buffer int
connect []chan interface{}
wg sync.WaitGroup
output chan interface{}
logger *log.Logger
}
// Workspace workspace
type Workspace struct {
worker int
handle func(interface{}) (interface{}, error)
}
// EndJob end job
type EndJob struct{}
// New new a Pipeline
func New() *Pipeline {
p := &Pipeline{}
p.buffer = 0
p.output = nil
p.logger = log.New(os.Stderr, "", log.LstdFlags)
return p
}
// Listen listen an channel
func (p *Pipeline) Listen(ch chan interface{}) *Pipeline {
p.entry = ch
return p
}
// Output output an channel
func (p *Pipeline) Output(ch chan interface{}) *Pipeline {
p.output = ch
return p
}
// Buffer how much to send job at the same time
func (p *Pipeline) Buffer(val int) *Pipeline {
p.buffer = val
return p
}
// JobSendEnd job send over
func (p *Pipeline) JobSendEnd() *Pipeline {
p.entry <- EndJob{}
return p
}
// SetLogger set logger
func (p *Pipeline) SetLogger(logger *log.Logger) *Pipeline {
p.logger = logger
return p
}
// Wait wait all job is done
func (p *Pipeline) Wait() {
p.wg.Wait()
}
// Process set a processing
func (p *Pipeline) Process(worker int, handle func(interface{}) (interface{}, error)) *Pipeline {
p.workspaces = append(p.workspaces, Workspace{worker: worker, handle: handle})
return p
}
// Run run this pipeline
func (p *Pipeline) Run() *Pipeline {
if p.entry == nil {
p.logger.Panic("Missing entry")
}
if p.output != nil && cap(p.output) <= cap(p.entry) {
p.logger.Panic("Output channel must big than entry")
}
l := len(p.workspaces)
if l == 0 {
p.logger.Panic("Workspace at least one")
}
p.connect = append(p.connect, p.entry)
for i := 0; i < l-1; i++ {
p.connect = append(p.connect, make(chan interface{}, p.buffer))
}
p.connect = append(p.connect, p.output)
for i := 0; i < l; i++ {
workshop := p.workspaces[i]
entry := p.connect[i]
next := p.connect[i+1]
p.work(entry, workshop.worker, workshop.handle, next)
}
return p
}
// work
func (p *Pipeline) work(entry chan interface{}, worker int, handle func(interface{}) (interface{}, error), next chan interface{}) {
p.wg.Add(1)
go func() {
//等待所有的任务处理完
var wg sync.WaitGroup
workers := make(chan int, worker)
for num := range entry {
//如果收到结束任务,不用再等待了
_, ok := num.(EndJob)
if ok {
break
}
workers <- 1
wg.Add(1)
go func(num interface{}) {
defer func() {
wg.Done()
<-workers
}()
ret, err := handle(num)
if err != nil {
p.logger.Println(err.Error())
return
}
p.writeNext(next, ret)
}(num)
}
wg.Wait()
//这个作业区的任务都处理完了,
p.writeNext(next, EndJob{})
p.wg.Done()
}()
}
// writeNext
func (p *Pipeline) writeNext(next chan interface{}, v interface{}) {
if next != nil {
next <- v
}
}