-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclickhouse_table_writer.go
149 lines (124 loc) · 2.86 KB
/
clickhouse_table_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package bristle
import (
"sync"
"time"
"github.com/rs/zerolog/log"
)
// A group of ClickhouseTableWriter's which are managed as one unit. This allows
// for dynamic configuration reload by swapping the underlying writer group.
type writerGroup struct {
sync.RWMutex
writers []*ClickhouseTableWriter
done chan struct{}
cleanup chan struct{}
size int
}
type ClickhouseTableWriter struct {
table *ClickhouseTable
buffer *MemoryRowBuffer
}
func newWriterGroup() *writerGroup {
return &writerGroup{
writers: make([]*ClickhouseTableWriter, 0),
done: make(chan struct{}),
cleanup: make(chan struct{}),
size: 0,
}
}
func NewClickhouseTableWriter(table *ClickhouseTable) (*ClickhouseTableWriter, error) {
buffer, err := NewMemoryBuffer(
string(table.Name),
table.config.MaxBufferSize,
table.config.OnFull,
)
if err != nil {
return nil, err
}
return &ClickhouseTableWriter{
table: table,
buffer: buffer,
}, nil
}
func (c *writerGroup) Add(writer *ClickhouseTableWriter) {
c.Lock()
defer c.Unlock()
writer.table.writers = append(writer.table.writers, writer)
c.writers = append(c.writers, writer)
}
func (c *writerGroup) Close() {
c.Lock()
defer c.Unlock()
close(c.done)
go func() {
log.Info().Int("writers", c.size).Msg("writer-group: waiting for all writers to shutdown")
for i := 0; i < c.size; i++ {
<-c.cleanup
}
log.Info().Msg("writer-group: all writers have shutdown, goodbye")
}()
}
func (c *writerGroup) Start() {
c.Lock()
defer c.Unlock()
for _, writer := range c.writers {
writer.Start(c.done, c.cleanup)
c.size += 1
}
}
func (c *ClickhouseTableWriter) Start(done chan struct{}, cleanup chan struct{}) {
go func() {
c.run(done)
cleanup <- struct{}{}
}()
}
func (c *ClickhouseTableWriter) run(done chan struct{}) {
ticker := time.NewTicker(time.Duration(c.table.config.FlushInterval) * time.Millisecond)
running := true
for running {
<-ticker.C
batch := c.buffer.FlushBatch(c.table.config.MaxBatchSize)
if batch != nil {
err := c.writeBatch(batch)
if err != nil {
log.Error().Err(err).Str("table", string(c.table.Name)).Int("batch-size", len(batch)).Msg("clickhouse-table-writer: failed to write batch")
}
}
select {
case <-done:
return
default:
continue
}
}
}
func (c *ClickhouseTableWriter) writeBatch(batch [][]interface{}) error {
conn, err := c.table.cluster.GetConn()
if err != nil {
return err
}
defer c.table.cluster.ReleaseConn(conn)
tx, err := conn.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(c.table.cachedInsertQuery)
if err != nil {
return err
}
defer stmt.Close()
for _, row := range batch {
_, err = stmt.Exec(
row...,
)
if err != nil {
return err
}
}
err = tx.Commit()
if err != nil {
// NB: for whatever reason clickhouse-go does not handle this well and
// leaks connections
conn.Close()
}
return err
}