Skip to content

Commit

Permalink
Refactoring for better Prometheus' logic support
Browse files Browse the repository at this point in the history
1) Breaking changes! Updated collection code for proper Prometheus support
- Now metrics names of all subcollectors have prefixes:
  nsq_topic_, nsq_channel_, nsq_client_
  As a result metrics registration code now passing Prometheus' client validation,
  also it fixes scrape warnings
- "type" label in exported metrics removed as obsolete
2) Add reset of gauges before scrape (like in haproxy-exporter)
3) Refactor: subcollectors logic is simplified, multiple collectors support removed
   We always use 'stats' collector so it removes unnecessary flexibility and complexity.
  • Loading branch information
nordicdyno committed Feb 13, 2017
1 parent 111f5d1 commit f5cc502
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 151 deletions.
10 changes: 7 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package collector

import "github.com/prometheus/client_golang/prometheus"

// Collector defines the interface for collecting all metrics for Prometheus.
type Collector interface {
Collect(out chan<- prometheus.Metric) error
// StatsCollector defines an interface for collecting specific stats
// from a nsqd exported stats data.
type StatsCollector interface {
set(s *stats)
collect(out chan<- prometheus.Metric)
describe(ch chan<- *prometheus.Desc)
reset()
}
85 changes: 48 additions & 37 deletions collector/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,63 +11,74 @@ import (
// This type implements the prometheus.Collector interface and can be
// registered in the metrics collection.
//
// The executor takes the time needed by each registered collector and
// The executor takes the time needed for scraping nsqd stat endpoint and
// provides an extra metric for this. This metric is labeled with the
// result ("success" or "error") and the collector.
// scrape result ("success" or "error").
type NsqExecutor struct {
collectors map[string]Collector
nsqdURL string

collectors []StatsCollector
summary *prometheus.SummaryVec
mutex sync.RWMutex
}

// NewNsqExecutor creates a new executor for the NSQ metrics.
func NewNsqExecutor(namespace string) *NsqExecutor {
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor {
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scrape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"result"})
prometheus.MustRegister(sum)
return &NsqExecutor{
collectors: make(map[string]Collector),
summary: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"collector", "result"}),
nsqdURL: nsqdURL,
summary: sum,
}
}

// AddCollector adds a new collector for the metrics collection.
// Each collector needs a unique name which is used as a label
// for the executor metric.
func (e *NsqExecutor) AddCollector(name string, c Collector) {
e.collectors[name] = c
// Use configures a specific stats collector, so the stats could be
// exposed to the Prometheus system.
func (e *NsqExecutor) Use(c StatsCollector) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.collectors = append(e.collectors, c)
}

// Describe implements the prometheus.Collector interface.
func (e *NsqExecutor) Describe(out chan<- *prometheus.Desc) {
e.summary.Describe(out)
func (e *NsqExecutor) Describe(ch chan<- *prometheus.Desc) {
for _, c := range e.collectors {
c.describe(ch)
}
}

// Collect implements the prometheus.Collector interface.
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
var wg sync.WaitGroup
wg.Add(len(e.collectors))
for name, coll := range e.collectors {
go func(name string, coll Collector) {
e.exec(name, coll, out)
wg.Done()
}(name, coll)
start := time.Now()
e.mutex.Lock()
defer e.mutex.Unlock()

// reset state, because metrics can gone
for _, c := range e.collectors {
c.reset()
}
wg.Wait()
}

func (e *NsqExecutor) exec(name string, coll Collector, out chan<- prometheus.Metric) {
start := time.Now()
err := coll.Collect(out)
dur := time.Since(start)
stats, err := getNsqdStats(e.nsqdURL)
tScrape := time.Since(start).Seconds()

labels := prometheus.Labels{"collector": name}
result := "success"
if err != nil {
labels["result"] = "error"
} else {
labels["result"] = "success"
result = "error"
}

e.summary.With(labels).Observe(dur.Seconds())
e.summary.WithLabelValues(result).Observe(tScrape)

if err == nil {
for _, c := range e.collectors {
c.set(stats)
}
for _, c := range e.collectors {
c.collect(out)
}
}
}
62 changes: 0 additions & 62 deletions collector/nsqd.go

This file was deleted.

25 changes: 21 additions & 4 deletions collector/stats_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type channelStats []struct {
// expose the channel metrics of a nsqd node to Prometheus. The
// channel metrics are reported per topic.
func ChannelStats(namespace string) StatsCollector {
labels := []string{"type", "topic", "channel", "paused"}
labels := []string{"topic", "channel", "paused"}
namespace += "_channel"

return channelStats{
{
Expand Down Expand Up @@ -101,20 +102,36 @@ func ChannelStats(namespace string) StatsCollector {
}
}

func (cs channelStats) collect(s *stats, out chan<- prometheus.Metric) {
func (cs channelStats) set(s *stats) {
for _, topic := range s.Topics {
for _, channel := range topic.Channels {
labels := prometheus.Labels{
"type": "channel",
"topic": topic.Name,
"channel": channel.Name,
"paused": strconv.FormatBool(channel.Paused),
}

for _, c := range cs {
c.vec.With(labels).Set(c.val(channel))
c.vec.Collect(out)
}
}
}
}

func (cs channelStats) collect(out chan<- prometheus.Metric) {
for _, c := range cs {
c.vec.Collect(out)
}
}

func (cs channelStats) describe(ch chan<- *prometheus.Desc) {
for _, c := range cs {
c.vec.Describe(ch)
}
}

func (cs channelStats) reset() {
for _, c := range cs {
c.vec.Reset()
}
}
25 changes: 21 additions & 4 deletions collector/stats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type clientStats []struct {
// Prometheus collection process. So be sure the number of clients
// is small enough when using this collector.
func ClientStats(namespace string) StatsCollector {
labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
labels := []string{"topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
namespace += "_client"

return clientStats{
{
Expand Down Expand Up @@ -90,12 +91,11 @@ func ClientStats(namespace string) StatsCollector {
}
}

func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) {
func (cs clientStats) set(s *stats) {
for _, topic := range s.Topics {
for _, channel := range topic.Channels {
for _, client := range channel.Clients {
labels := prometheus.Labels{
"type": "client",
"topic": topic.Name,
"channel": channel.Name,
"deflate": strconv.FormatBool(client.Deflate),
Expand All @@ -109,9 +109,26 @@ func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) {

for _, c := range cs {
c.vec.With(labels).Set(c.val(client))
c.vec.Collect(out)
}
}
}
}
}

func (cs clientStats) collect(out chan<- prometheus.Metric) {
for _, c := range cs {
c.vec.Collect(out)
}
}

func (cs clientStats) describe(ch chan<- *prometheus.Desc) {
for _, c := range cs {
c.vec.Describe(ch)
}
}

func (cs clientStats) reset() {
for _, c := range cs {
c.vec.Reset()
}
}
24 changes: 20 additions & 4 deletions collector/stats_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type topicStats []struct {
// TopicStats creates a new stats collector which is able to
// expose the topic metrics of a nsqd node to Prometheus.
func TopicStats(namespace string) StatsCollector {
labels := []string{"type", "topic", "paused"}
labels := []string{"topic", "paused"}
namespace += "_topic"

return topicStats{
{
Expand Down Expand Up @@ -68,17 +69,32 @@ func TopicStats(namespace string) StatsCollector {
}
}

func (ts topicStats) collect(s *stats, out chan<- prometheus.Metric) {
func (ts topicStats) set(s *stats) {
for _, topic := range s.Topics {
labels := prometheus.Labels{
"type": "topic",
"topic": topic.Name,
"paused": strconv.FormatBool(topic.Paused),
}

for _, c := range ts {
c.vec.With(labels).Set(c.val(topic))
c.vec.Collect(out)
}
}
}
func (ts topicStats) collect(out chan<- prometheus.Metric) {
for _, c := range ts {
c.vec.Collect(out)
}
}

func (ts topicStats) describe(ch chan<- *prometheus.Desc) {
for _, c := range ts {
c.vec.Describe(ch)
}
}

func (ts topicStats) reset() {
for _, c := range ts {
c.vec.Reset()
}
}
Loading

0 comments on commit f5cc502

Please sign in to comment.