Skip to content

Commit

Permalink
feat: allow multiple queues with the same metric name
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Dec 12, 2024
1 parent b1b4bfd commit 4b33afe
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 32 deletions.
87 changes: 59 additions & 28 deletions collections/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"github.com/samber/lo"
)

var (
metricCache = map[string]any{}
metricCacheLock sync.Mutex
)

// MetricsOpts contains options for queue metrics
type MetricsOpts[T comparable] struct {
Labels map[string]any
Expand Down Expand Up @@ -87,22 +92,19 @@ func (m *metrics[T]) dequeue(item queueItem[T], currentSize int) {
m.dequeuedTotal.With(labels).Inc()
m.queueDuration.With(labels).Observe(float64(time.Since(item.inserted).Milliseconds()))
m.queueSize.Set(float64(currentSize))

}

func newMetrics[T comparable](opts MetricsOpts[T]) *metrics[T] {

keys := lo.Keys(opts.Labels)

labels := prometheus.Labels{}

for k, v := range opts.Labels {
labels[k] = fmt.Sprintf("%v", v)
}

for k := range opts.Labeller {
keys = append(keys, k)
}

if len(opts.DurationBuckets) == 0 {
opts.DurationBuckets = []float64{
1, 10, 50, 100, 500, 1000, 3 * 1000, 10 * 1000, 30 * 1000, 60 * 1000, 300 * 1000,
Expand All @@ -113,33 +115,63 @@ func newMetrics[T comparable](opts MetricsOpts[T]) *metrics[T] {
opts.Name = "priority_queue"
}

metricCacheLock.Lock()
defer metricCacheLock.Unlock()

return &metrics[T]{
opts: opts,
enqueuedTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: opts.Name + "_enqueued_total",
Help: "The total number of enqueued items",
}, keys),
dedupedTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: opts.Name + "_deduped_total",
Help: "The total number of enqueued items",
}, keys),
dequeuedTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: opts.Name + "_dequeued_total",
Help: "The total number of dequeued items",
}, keys),
queueSize: promauto.NewGauge(prometheus.GaugeOpts{
Name: opts.Name + "_size",
Help: "The current size of the queue",
ConstLabels: labels,
}),
queueDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: opts.Name + "_duration",
Help: "Time an object spent in the queue in milliseconds",
Buckets: opts.DurationBuckets,
}, keys),
opts: opts,
enqueuedTotal: getOrCreateCounterVec(opts.Name, "enqueued_total", "The total number of enqueued items", keys),
dedupedTotal: getOrCreateCounterVec(opts.Name, "deduped_total", "The total number of deduped items", keys),
dequeuedTotal: getOrCreateCounterVec(opts.Name, "dequeued_total", "The total number of dequeued items", keys),
queueSize: getOrCreateGauge(opts.Name, "size", "The current size of the queue", labels),
queueDuration: getOrCreateHistogramVec(opts.Name, "duration", "Time an object spent in the queue in milliseconds", keys, opts.DurationBuckets),
}
}

func getOrCreateCounterVec(prefix, suffix, help string, keys []string) *prometheus.CounterVec {
name := fmt.Sprintf("%s_%s", prefix, suffix)
if val, ok := metricCache[name]; ok {
return val.(*prometheus.CounterVec)
}

counter := promauto.NewCounterVec(prometheus.CounterOpts{
Name: name,
Help: help,
}, keys)
metricCache[name] = counter
return counter
}

func getOrCreateGauge(prefix, suffix, help string, labels prometheus.Labels) prometheus.Gauge {
name := fmt.Sprintf("%s_%s", prefix, suffix)
if val, ok := metricCache[name]; ok {
return val.(prometheus.Gauge)
}

gauge := promauto.NewGauge(prometheus.GaugeOpts{
Name: name,
Help: help,
ConstLabels: labels,
})
metricCache[name] = gauge
return gauge
}

func getOrCreateHistogramVec(prefix, suffix, help string, keys []string, buckets []float64) *prometheus.HistogramVec {
name := fmt.Sprintf("%s_%s", prefix, suffix)
if val, ok := metricCache[name]; ok {
return val.(*prometheus.HistogramVec)
}

histogram := promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: name,
Help: help,
Buckets: buckets,
}, keys)
metricCache[name] = histogram
return histogram
}

type queueItem[T comparable] struct {
item T
inserted time.Time
Expand Down Expand Up @@ -215,7 +247,6 @@ func (queue *Queue[T]) Dequeue() (T, bool) {
queue.metrics.dequeue(wrapper, queue.heap.Size())

if queue.Dedupe {

// Keep dequeuing while next item is the same as current
for {
next, hasNext := queue.heap.Peek()
Expand Down
16 changes: 12 additions & 4 deletions collections/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestPriorityQueueString(t *testing.T) {
return i[0:1]
},
},
}})
},
})

g.Expect(err).To(BeNil())
g.Expect(pq.Size()).To(BeNumerically("==", 0))
Expand Down Expand Up @@ -75,7 +76,14 @@ func TestPriorityQueue(t *testing.T) {

pq, err := NewQueue(QueueOpts[*QueueItem]{
Metrics: MetricsOpts[*QueueItem]{
Name: "test",
Labels: map[string]any{
"const_label": "value1",
},
Labeller: map[string]func(i *QueueItem) string{
"prefix": func(i *QueueItem) string {
return "dummy"
},
},
},
Comparator: func(a, b *QueueItem) int {
return strings.Compare(a.Obj["name"].(string), b.Obj["name"].(string))
Expand Down Expand Up @@ -113,7 +121,8 @@ func TestPriorityQueueDedupe(t *testing.T) {
Comparator: strings.Compare,
Metrics: MetricsOpts[string]{
Name: "dedupe_queue",
}})
},
})

g.Expect(err).To(BeNil())
g.Expect(pq.Size()).To(BeNumerically("==", 0))
Expand All @@ -138,7 +147,6 @@ func TestPriorityQueueDedupe(t *testing.T) {
g.Expect("dedupe_queue_dequeued_total").To(matchers.MatchCounter(3))
g.Expect("dedupe_queue_deduped_total").To(matchers.MatchCounter(1))
g.Expect("dedupe_queue_size").To(matchers.MatchCounter(0))

}

func TestPriorityQueueConcurrency(t *testing.T) {
Expand Down

0 comments on commit 4b33afe

Please sign in to comment.