-
-
Notifications
You must be signed in to change notification settings - Fork 66
/
collector.go
156 lines (140 loc) · 4.97 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package sql_exporter
import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"time"
"github.com/burningalchemist/sql_exporter/config"
"github.com/burningalchemist/sql_exporter/errors"
dto "github.com/prometheus/client_model/go"
)
// Collector is a self-contained group of SQL queries and metric families to collect from a specific database. It is
// conceptually similar to a prometheus.Collector.
type Collector interface {
// Collect is the equivalent of prometheus.Collector.Collect() but takes a context to run in and a database to run on.
Collect(context.Context, *sql.DB, chan<- Metric)
}
// collector implements Collector. It wraps a collection of queries, metrics and the database to collect them from.
type collector struct {
config *config.CollectorConfig
queries []*Query
logContext string
}
// NewCollector returns a new Collector with the given configuration and database. The metrics it creates will all have
// the provided const labels applied.
func NewCollector(logContext string, cc *config.CollectorConfig, constLabels []*dto.LabelPair) (Collector, errors.WithContext) {
logContext = TrimMissingCtx(fmt.Sprintf(`%s,collector=%s`, logContext, cc.Name))
// Maps each query to the list of metric families it populates.
queryMFs := make(map[*config.QueryConfig][]*MetricFamily, len(cc.Metrics))
// Instantiate metric families.
for _, mc := range cc.Metrics {
mf, err := NewMetricFamily(logContext, mc, constLabels)
if err != nil {
return nil, err
}
mfs, found := queryMFs[mc.Query()]
if !found {
mfs = make([]*MetricFamily, 0, 2)
}
queryMFs[mc.Query()] = append(mfs, mf)
}
// Instantiate queries.
queries := make([]*Query, 0, len(cc.Metrics))
for qc, mfs := range queryMFs {
q, err := NewQuery(logContext, qc, mfs...)
if err != nil {
return nil, err
}
queries = append(queries, q)
}
c := collector{
config: cc,
queries: queries,
logContext: logContext,
}
if c.config.MinInterval > 0 {
slog.Warn("Non-zero min_interval, using cached collector.", "logContext", logContext, "min_interval", c.config.MinInterval)
return newCachingCollector(&c), nil
}
return &c, nil
}
// Collect implements Collector.
func (c *collector) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) {
var wg sync.WaitGroup
wg.Add(len(c.queries))
for _, q := range c.queries {
go func(q *Query) {
defer wg.Done()
q.Collect(ctx, conn, ch)
}(q)
}
// Only return once all queries have been processed
wg.Wait()
}
// newCachingCollector returns a new Collector wrapping the provided raw Collector.
func newCachingCollector(rawColl *collector) Collector {
cc := &cachingCollector{
rawColl: rawColl,
minInterval: time.Duration(rawColl.config.MinInterval),
cacheSem: make(chan time.Time, 1),
}
cc.cacheSem <- time.Time{}
return cc
}
// Collector with a cache for collected metrics. Only used when min_interval is non-zero.
type cachingCollector struct {
// Underlying collector, which is being cached.
rawColl *collector
// Convenience copy of rawColl.config.MinInterval.
minInterval time.Duration
// Used as a non=blocking semaphore protecting the cache. The value in the channel is the time of the cached metrics.
cacheSem chan time.Time
// Metrics saved from the last Collect() call.
cache []Metric
}
// Collect implements Collector.
func (cc *cachingCollector) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) {
if ctx.Err() != nil {
ch <- NewInvalidMetric(errors.Wrap(cc.rawColl.logContext, ctx.Err()))
return
}
slog.Debug("Cache size", "length", len(cc.cache))
collTime := time.Now()
select {
case cacheTime := <-cc.cacheSem:
// Have the lock.
if age := collTime.Sub(cacheTime); age > cc.minInterval || len(cc.cache) == 0 {
// Cache contents are older than minInterval, collect fresh metrics, cache them and pipe them through.
slog.Debug("Collecting fresh metrics", "logContext", cc.rawColl.logContext, "min_interval", cc.minInterval.Seconds(), "cache_age", age.Seconds())
cacheChan := make(chan Metric, capMetricChan)
cc.cache = make([]Metric, 0, len(cc.cache))
go func() {
cc.rawColl.Collect(ctx, conn, cacheChan)
close(cacheChan)
}()
for metric := range cacheChan {
// catch invalid metrics and return them immediately, don't cache them
if ctx.Err() != nil {
slog.Debug("Context closed, returning invalid metric", "logContext", cc.rawColl.logContext)
ch <- NewInvalidMetric(errors.Wrap(cc.rawColl.logContext, ctx.Err()))
continue
}
cc.cache = append(cc.cache, metric)
ch <- metric
}
cacheTime = collTime
} else {
slog.Debug("Returning cached metrics", "logContext", cc.rawColl.logContext, "min_interval", cc.minInterval.Seconds(), "cache_age", age.Seconds())
for _, metric := range cc.cache {
ch <- metric
}
}
// Always replace the value in the semaphore channel.
cc.cacheSem <- cacheTime
case <-ctx.Done():
// Context closed, record an error and return
ch <- NewInvalidMetric(errors.Wrap(cc.rawColl.logContext, ctx.Err()))
}
}