-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathbq_output.go
280 lines (232 loc) · 7.22 KB
/
bq_output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// Copyright 2015 Boa Ho Man. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hbq
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"os"
"time"
"google.golang.org/api/bigquery/v2"
"github.com/aranair/heka-bigquery/bq"
. "github.com/mozilla-services/heka/pipeline"
)
// Interval to tick
const IntervalPeriod time.Duration = 24 * time.Hour
// Hour for 1st tick
const TickHour int = 00
// Minute for 1st tick
const TickMinute int = 00
// Second for 1st tick
const TickSecond int = 00
// Max buffer size before it attempts an upload in bytes, currently 1000 for testing.
const MaxBuffer = 1000
// A BqOutputConfig holds the information needed to configure the heka plugin.
// Service Email: Service account email found in Google Developers console
// Pem File: PKCS12 file that is generated from the .p12 file
// Schema file: BigQuery schema json file. See example schema file `realtime_log.schema.sample`
// BufferPath + BufferFile: Full path to the 'backup' file that is written to at the same time as the buffer
type BqOutputConfig struct {
ProjectId string `toml:"project_id"`
DatasetId string `toml:"dataset_id"`
TableId string `toml:"table_id"`
ServiceEmail string `toml:"service_email"`
PemFilePath string `toml:"pem_file_path"`
SchemaFilePath string `toml:"schema_file_path"`
BufferPath string `toml:"buffer_path"`
BufferFile string `toml:"buffer_file"`
}
// A BqOutput holds the uploader/schema.
type BqOutput struct {
schema []byte
config *BqOutputConfig
bu *bq.BqUploader
}
func (bqo *BqOutput) ConfigStruct() interface{} {
return &BqOutputConfig{}
}
// Init function that gets run by Heka when the plugin gets loaded
// Reads PEM files/schema files and initializes the BqUploader objects
func (bqo *BqOutput) Init(config interface{}) (err error) {
bqo.config = config.(*BqOutputConfig)
pkey, _ := ioutil.ReadFile(bqo.config.PemFilePath)
schema, _ := ioutil.ReadFile(bqo.config.SchemaFilePath)
bu := bq.NewBqUploader(pkey, bqo.config.ProjectId, bqo.config.DatasetId, bqo.config.ServiceEmail)
bqo.schema = schema
bqo.bu = bu
return
}
// Gets called by Heka when the plugin is running.
// For more information, visit https://hekad.readthedocs.org/en/latest/developing/plugin.html
func (bqo *BqOutput) Run(or OutputRunner, h PluginHelper) (err error) {
var (
// Heka messages
pack *PipelinePack
payload []byte
// File used for the backup buffer
f *os.File
// The "current" time that is used to upload. Used to keep track of old/new day when midnight ticker ticks.
oldDay time.Time
// When the midnight ticker ticks, this is used to format the new bigquery table name.
now time.Time
ok = true
)
// Channel that delivers the heka payloads
inChan := or.InChan()
midnightTicker := midnightTickerUpdate()
// Buffer that is used to store logs before uploading to bigquery
buf := bytes.NewBuffer(nil)
fileOp := os.O_CREATE | os.O_APPEND | os.O_WRONLY
// Ensures that the directories are there before saving
mkDirectories(bqo.config.BufferPath)
fp := bqo.config.BufferPath + "/" + bqo.config.BufferFile // form full path
f, _ = os.OpenFile(fp, fileOp, 0666)
oldDay = time.Now().Local()
// Initializes the current day table
if err = bqo.bu.CreateTable(bqo.tableName(oldDay), bqo.schema); err != nil {
logError(or, "Initialize Table", err)
}
encoder := or.Encoder()
for ok {
select {
case pack, ok = <-inChan:
if !ok {
break
}
if encoder != nil {
payload, err = or.Encode(pack)
if err != nil {
or.LogError(err)
pack.Recycle()
continue
}
} else {
payload = []byte(pack.Message.GetPayload())
pack.Recycle()
}
// Write to both file and buffer
if _, err = f.Write(payload); err != nil {
logError(or, "Write to File", err)
}
if _, err = buf.Write(payload); err != nil {
logError(or, "Write to Buffer", err)
}
// Upload Stuff (1mb)
if buf.Len() > MaxBuffer {
f.Close() // Close file for uploading
bqo.UploadAndReset(buf, fp, oldDay, or)
f, _ = os.OpenFile(fp, fileOp, 0666)
}
case <-midnightTicker.C:
now = time.Now().Local()
// If Buffer is not empty, upload the rest of the contents to the oldday's table.
if buf.Len() > 0 {
f.Close() // Close file for uploading
bqo.UploadAndReset(buf, fp, oldDay, or)
f, _ = os.OpenFile(fp, fileOp, 0666)
}
logUpdate(or, "Midnight! Creating new table: "+bqo.tableName(now))
// Create a new table for the new day and update current date.
if err = bqo.bu.CreateTable(bqo.tableName(now), bqo.schema); err != nil {
logError(or, "Create New Day Table", err)
}
oldDay = now
}
}
logUpdate(or, "Shutting down BQ output runner.")
return
}
// Prepares data and uploads them to the BigQuery Table.
// Shared by both file/buffer uploads
func (bqo *BqOutput) Upload(i interface{}, tableName string) (err error) {
var data []byte
list := make([]map[string]bigquery.JsonValue, 0)
for {
data, _ = readData(i)
if len(data) == 0 {
break
}
list = append(list, bq.BytesToBqJsonRow(data))
}
return bqo.bu.InsertRows(tableName, list)
}
func readData(i interface{}) (line []byte, err error) {
switch v := i.(type) {
case *bytes.Buffer:
line, err = v.ReadBytes('\n')
case *bufio.Reader:
line, err = v.ReadBytes('\n')
}
return
}
// Uploads buffer, and if it fails/contains errors, falls back to using the file to upload.
// After which clears the buffer and deletes the backup file
func (bqo *BqOutput) UploadAndReset(buf *bytes.Buffer, path string, d time.Time, or OutputRunner) {
tn := bqo.tableName(d)
logUpdate(or, "Buffer limit reached, uploading"+tn)
if err := bqo.Upload(buf, tn); err != nil {
logError(or, "Upload Buffer", err)
if err := bqo.UploadFile(path, tn); err != nil {
logError(or, "Upload File", err)
} else {
logUpdate(or, "Upload File Successful")
}
} else {
logUpdate(or, "Upload Buffer Successful")
}
// Cleanup and Reset
buf.Reset()
_ = os.Remove(path)
}
// Uploads file at `path` to BigQuery table
func (bqo *BqOutput) UploadFile(path string, tableName string) (err error) {
f, _ := os.Open(path)
fr := bufio.NewReader(f)
err = bqo.Upload(fr, tableName)
f.Close()
return
}
func formatDate(t time.Time) string {
return fmt.Sprintf(t.Format("20060102"))
}
func logUpdate(or OutputRunner, title string) {
or.LogMessage(title)
}
func logError(or OutputRunner, title string, err error) {
or.LogMessage(fmt.Sprintf("%s - Error -: %s", title, err))
}
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
func mkDirectories(path string) {
if ok, _ := exists(path); !ok {
_ = os.MkdirAll(path, 0666)
}
}
func midnightTickerUpdate() *time.Ticker {
nextTick := time.Date(time.Now().Year(), time.Now().Month(),
time.Now().Day(), TickHour, TickMinute, TickSecond,
0, time.Local)
if !nextTick.After(time.Now()) {
nextTick = nextTick.Add(IntervalPeriod)
}
diff := nextTick.Sub(time.Now())
return time.NewTicker(diff)
}
func (bqo *BqOutput) tableName(d time.Time) string {
return bqo.config.TableId + formatDate(d)
}
func init() {
RegisterPlugin("BqOutput", func() interface{} {
return new(BqOutput)
})
}