Skip to content

Commit

Permalink
rules: Make deleted rule series as stale after a reload (prometheus#6745
Browse files Browse the repository at this point in the history
)

* rules: Make deleted rule series as stale after a reload

Signed-off-by: Julien Pivotto <[email protected]>
  • Loading branch information
Julien Pivotto authored Feb 12, 2020
1 parent c35438f commit 135cc30
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 56 deletions.
5 changes: 5 additions & 0 deletions rules/fixtures/rules2_copy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
groups:
- name: test_2 copy
rules:
- record: test_2
expr: vector(2)
166 changes: 115 additions & 51 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,37 +231,48 @@ type Group struct {

shouldRestore bool

done chan struct{}
terminated chan struct{}
done chan bool
terminated chan struct{}
managerDone chan struct{}

logger log.Logger

metrics *Metrics
}

type GroupOptions struct {
Name, File string
Interval time.Duration
Rules []Rule
ShouldRestore bool
Opts *ManagerOptions
done chan struct{}
}

// NewGroup makes a new Group with the given name, options, and rules.
func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRestore bool, opts *ManagerOptions) *Group {
metrics := opts.Metrics
func NewGroup(o GroupOptions) *Group {
metrics := o.Opts.Metrics
if metrics == nil {
metrics = NewGroupMetrics(opts.Registerer)
metrics = NewGroupMetrics(o.Opts.Registerer)
}

metrics.groupLastEvalTime.WithLabelValues(groupKey(file, name))
metrics.groupLastDuration.WithLabelValues(groupKey(file, name))
metrics.groupRules.WithLabelValues(groupKey(file, name)).Set(float64(len(rules)))
metrics.groupInterval.WithLabelValues(groupKey(file, name)).Set(interval.Seconds())
metrics.groupLastEvalTime.WithLabelValues(groupKey(o.File, o.Name))
metrics.groupLastDuration.WithLabelValues(groupKey(o.File, o.Name))
metrics.groupRules.WithLabelValues(groupKey(o.File, o.Name)).Set(float64(len(o.Rules)))
metrics.groupInterval.WithLabelValues(groupKey(o.File, o.Name)).Set(o.Interval.Seconds())

return &Group{
name: name,
file: file,
interval: interval,
rules: rules,
shouldRestore: shouldRestore,
opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
name: o.Name,
file: o.File,
interval: o.Interval,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan bool),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(opts.Logger, "group", name),
logger: log.With(o.Opts.Logger, "group", o.Name),
metrics: metrics,
}
}
Expand Down Expand Up @@ -314,14 +325,38 @@ func (g *Group) run(ctx context.Context) {
tick := time.NewTicker(g.interval)
defer tick.Stop()

makeStale := func(s bool) {
if !s {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(now)
}
}(time.Now())
}

iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
case stale := <-g.done:
makeStale(stale)
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
Expand All @@ -339,11 +374,13 @@ func (g *Group) run(ctx context.Context) {

for {
select {
case <-g.done:
case stale := <-g.done:
makeStale(stale)
return
default:
select {
case <-g.done:
case stale := <-g.done:
makeStale(stale)
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
Expand All @@ -358,6 +395,11 @@ func (g *Group) run(ctx context.Context) {
}
}

func (g *Group) stopAndMakeStale() {
g.done <- true
<-g.terminated
}

func (g *Group) stop() {
close(g.done)
<-g.terminated
Expand Down Expand Up @@ -596,31 +638,35 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}(i, rule)
}
g.cleanupStaleSeries(ts)
}

if len(g.staleSeries) != 0 {
app, err := g.opts.Appendable.Appender()
if err != nil {
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err)
}
}
if err := app.Commit(); err != nil {
level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err)
} else {
g.staleSeries = nil
func (g *Group) cleanupStaleSeries(ts time.Time) {
if len(g.staleSeries) == 0 {
return
}
app, err := g.opts.Appendable.Appender()
if err != nil {
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err)
}
}
if err := app.Commit(); err != nil {
level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err)
} else {
g.staleSeries = nil
}
}

// RestoreForState restores the 'for' state of the alerts
Expand Down Expand Up @@ -784,6 +830,7 @@ type Manager struct {
groups map[string]*Group
mtx sync.RWMutex
block chan struct{}
done chan struct{}
restored bool

logger log.Logger
Expand Down Expand Up @@ -825,6 +872,7 @@ func NewManager(o *ManagerOptions) *Manager {
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
done: make(chan struct{}),
logger: o.Logger,
}

Expand All @@ -848,6 +896,10 @@ func (m *Manager) Stop() {
eg.stop()
}

// Shut down the groups waiting multiple evaluation intervals to write
// staleness markers.
close(m.done)

level.Info(m.logger).Log("msg", "Rule manager stopped")
}

Expand Down Expand Up @@ -899,14 +951,18 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
}

// Stop remaining old groups.
wg.Add(len(m.groups))
for n, oldg := range m.groups {
oldg.stop()
if m := oldg.metrics; m != nil {
m.groupInterval.DeleteLabelValues(n)
m.groupLastEvalTime.DeleteLabelValues(n)
m.groupLastDuration.DeleteLabelValues(n)
m.groupRules.DeleteLabelValues(n)
}
go func(n string, g *Group) {
g.stopAndMakeStale()
if m := g.metrics; m != nil {
m.groupInterval.DeleteLabelValues(n)
m.groupLastEvalTime.DeleteLabelValues(n)
m.groupLastDuration.DeleteLabelValues(n)
m.groupRules.DeleteLabelValues(n)
}
wg.Done()
}(n, oldg)
}

wg.Wait()
Expand Down Expand Up @@ -962,7 +1018,15 @@ func (m *Manager) LoadGroups(
))
}

groups[groupKey(fn, rg.Name)] = NewGroup(rg.Name, fn, itv, rules, shouldRestore, m.opts)
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
done: m.done,
})
}
}

Expand Down
Loading

0 comments on commit 135cc30

Please sign in to comment.