-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocessing.go
217 lines (180 loc) · 4.84 KB
/
processing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package phonelab
// package processing contains interfaces and functionality for building
// log-processing pipelines. The basic interface is Processor, which abstracts
// any processing component that can generate data. Concrete implementations
// are provided for a muxer, demuxer, simple I/O processor, and multiple
// log sources.
import (
"fmt"
"log"
"regexp"
"sync"
)
// The basic interface that all PhoneLab pipeline operators implement. When the
// Process() function is invoked, the Processor should create and return a
// channel that logs will be transmitted on.
type Processor interface {
Process() <-chan interface{}
}
// LogHandlers accept callbacks when a new log arrives and is ready to be
// processed.
type LogHandler interface {
Handle(log interface{}) interface{}
Finish()
}
// SimpleProcessor is a Processor with a single source and a single output
// channel. Furthermore, for each log on the in-channel, the log is passed to
// the processor's LogHandler, which performs some operation on the the log.
// If the LogHandler returns a non-nil value, this value will be written on the
// output channel.
type SimpleProcessor struct {
Handler LogHandler
Source Processor
}
// Create a new SimpleOperator with a single source and handler
func NewSimpleProcessor(source Processor, handler LogHandler) *SimpleProcessor {
return &SimpleProcessor{
Handler: handler,
Source: source,
}
}
func (proc *SimpleProcessor) Process() <-chan interface{} {
outChan := make(chan interface{})
if proc.Handler == nil {
panic("SimpleProcessor handler cannot be nil!")
}
if proc.Source == nil {
panic("SimpleProcessor source cannot be nil!")
}
go func() {
inChan := proc.Source.Process()
for log := range inChan {
if res := proc.Handler.Handle(log); res != nil {
outChan <- res
}
}
proc.Handler.Finish()
close(outChan)
}()
return outChan
}
// Muxer multiplexes log lines/objects onto multiple output channels from a
// single source.
type Muxer struct {
Source Processor
dest []chan interface{}
numDest int
l sync.Mutex
}
func NewMuxer(source Processor, numDest int) *Muxer {
return &Muxer{
Source: source,
dest: make([]chan interface{}, 0),
numDest: numDest,
}
}
func (m *Muxer) Process() <-chan interface{} {
// This is going to be invoked multiple times, once for each output
// processor, but we need to give each one their own channel. And, we want
// to wait until all the channels have been created to start processing.
m.l.Lock()
defer m.l.Unlock()
outChan := make(chan interface{})
m.dest = append(m.dest, outChan)
if len(m.dest) > m.numDest {
panic("Muxer: More invocations than destinations")
} else if len(m.dest) < m.numDest {
// Not there yet
return outChan
}
// Good to go.
go func() {
inChan := m.Source.Process()
for log := range inChan {
// Multiplex current message. For now, blocking non-concurrent sends.
for _, c := range m.dest {
c <- log
}
}
for _, c := range m.dest {
close(c)
}
}()
return outChan
}
// Demuxer takes input from multiple sources and funnels it down a single
// output channel.
type Demuxer struct {
Sources []Processor
}
func NewDemuxer(sources []Processor) *Demuxer {
return &Demuxer{
Sources: sources,
}
}
func (dm *Demuxer) Process() <-chan interface{} {
outChan := make(chan interface{})
done := make(chan int)
var runOne = func(p Processor) {
res := p.Process()
for log := range res {
outChan <- log
}
done <- 1
}
// Process
go func() {
for _, proc := range dm.Sources {
go runOne(proc)
}
for i := 0; i < len(dm.Sources); i++ {
<-done
}
close(outChan)
}()
return outChan
}
type StringFilter func(string) bool
type StringFilterHandler struct {
Filters []StringFilter
}
func (p *StringFilterHandler) Handle(log interface{}) interface{} {
switch t := log.(type) {
case string:
for _, filter := range p.Filters {
if filter(t) {
return log
}
}
default:
panic(fmt.Sprintf("String filter got non-string object: %T", log))
}
// Didn't pass the filters
return nil
}
func (p *StringFilterHandler) Finish() {}
func NewStringFilterProcessor(source Processor, filters []StringFilter) Processor {
return NewSimpleProcessor(source, &StringFilterHandler{filters})
}
func makeRegexFilter(regexStr string) StringFilter {
regex := regexp.MustCompile(regexStr)
return func(s string) bool {
return regex.MatchString(s)
}
}
// Log
type LoglineProcessorHandler struct {
Parser *LoglineParser
}
func (p *LoglineProcessorHandler) Handle(logline interface{}) interface{} {
line := logline.(string)
ll, err := p.Parser.Parse(line)
if err != nil {
log.Printf("Error parsing line: %v\n", err)
}
return ll
}
func (p *LoglineProcessorHandler) Finish() {}
func NewLoglineProcessor(source Processor, parser *LoglineParser) Processor {
return NewSimpleProcessor(source, &LoglineProcessorHandler{parser})
}