Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Stepanenko <[email protected]>
  • Loading branch information
Daniil Stepanenko committed Oct 14, 2024
1 parent 33656f8 commit 9a5cc26
Show file tree
Hide file tree
Showing 14 changed files with 7,036 additions and 325 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ docs
examples
scripts
test
libjq

.git
.gitignore
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@
.idea
*.iml
*.proj

# C Dependency
libjq
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/flant/shell-operator

go 1.19
go 1.22.8

require (
github.com/flant/kube-client v1.2.0
Expand Down Expand Up @@ -53,6 +53,7 @@ require (
github.com/go-stack/stack v1.8.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gojuno/minimock/v3 v3.4.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ github.com/gofrs/uuid/v5 v5.3.0 h1:m0mUMr+oVYUdxpMLgSYCZiXe7PuVPnI94+OMeVBNedk=
github.com/gofrs/uuid/v5 v5.3.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gojuno/minimock/v3 v3.4.0 h1:htPGQuFvmCaTygTnARPp5tSWZUZxOnu8A2RDVyl/LA8=
github.com/gojuno/minimock/v3 v3.4.0/go.mod h1:0PdkFMCugnywaAqwrdWMZMzHhSH3ZoXlMVHiRVdIrLk=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
Expand Down
298 changes: 298 additions & 0 deletions pkg/metric/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
package metric

import (
"hash/fnv"
"sort"
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"

. "github.com/flant/shell-operator/pkg/utils/labels"
)

type ConstCollector interface {
Describe(ch chan<- *prometheus.Desc)
Collect(ch chan<- prometheus.Metric)
Type() string
LabelNames() []string
Name() string
ExpireGroupMetrics(group string)
UpdateLabels([]string)
}

type GroupedCounterMetric struct {
Value uint64
LabelValues []string
Group string
}

type GroupedGaugeMetric struct {
Value float64
LabelValues []string
Group string
}

type ConstCounterCollector struct {
mtx sync.RWMutex

collection map[uint64]GroupedCounterMetric
desc *prometheus.Desc
name string
labelNames []string
}

func NewConstCounterCollector(name string, labelNames []string) *ConstCounterCollector {
desc := prometheus.NewDesc(name, name, labelNames, nil)
return &ConstCounterCollector{
name: name,
labelNames: labelNames,
desc: desc,
collection: make(map[uint64]GroupedCounterMetric),
}
}

// Add increases a counter metric by a value. Metric is identified by label values and a group.
func (c *ConstCounterCollector) Add(group string, value float64, labels map[string]string) {
c.mtx.Lock()
defer c.mtx.Unlock()

labelValues := LabelValues(labels, c.labelNames)
labelsHash := HashLabelValues(labelValues)

// TODO add group to hash
storedMetric, ok := c.collection[labelsHash]
if !ok {
storedMetric = GroupedCounterMetric{
Value: uint64(value),
LabelValues: labelValues,
Group: group,
}
} else {
atomic.AddUint64(&storedMetric.Value, uint64(value))
}

c.collection[labelsHash] = storedMetric
}

func (c *ConstCounterCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.desc
}

func (c *ConstCounterCollector) Collect(ch chan<- prometheus.Metric) {
c.mtx.RLock()
defer c.mtx.RUnlock()

for _, s := range c.collection {
ch <- prometheus.MustNewConstMetric(c.desc, prometheus.CounterValue, float64(s.Value), s.LabelValues...)
}
}

func (c *ConstCounterCollector) Type() string {
return "counter"
}

func (c *ConstCounterCollector) LabelNames() []string {
return c.labelNames
}

func (c *ConstCounterCollector) Name() string {
return c.name
}

// ExpireGroupMetrics deletes all metrics from collection with matched group.
func (c *ConstCounterCollector) ExpireGroupMetrics(group string) {
c.mtx.Lock()
defer c.mtx.Unlock()

for hash, m := range c.collection {
if m.Group == group {
delete(c.collection, hash)
}
}
}

// UpdateLabels checks if any new labels are provided to the controller and updates its description, labelNames list and collection.
// The collection is recalculated in accordance with new label list.
func (c *ConstCounterCollector) UpdateLabels(labels []string) {
c.mtx.Lock()
var mustUpdate bool
previousLabelsMap := make(map[string]int, len(c.labelNames))
for idx, label := range c.labelNames {
previousLabelsMap[label] = idx
}

previousLabelSet := make([]string, len(c.labelNames))
copy(previousLabelSet, c.labelNames)

for _, label := range labels {
if _, found := previousLabelsMap[label]; !found {
mustUpdate = true
c.labelNames = append(c.labelNames, label)
}
}
sort.Strings(c.labelNames)

if mustUpdate {
c.desc = prometheus.NewDesc(c.name, c.name, c.labelNames, nil)
newCollection := make(map[uint64]GroupedCounterMetric)
for hash, metric := range c.collection {
if len(metric.LabelValues) != len(c.labelNames) {
newLabelsValues := make([]string, 0, len(c.labelNames))
for _, labelName := range c.labelNames {
if idx, found := previousLabelsMap[labelName]; found {
newLabelsValues = append(newLabelsValues, metric.LabelValues[idx])
} else {
newLabelsValues = append(newLabelsValues, "")
}
}
newLabelsHash := HashLabelValues(newLabelsValues)
newCollection[newLabelsHash] = GroupedCounterMetric{
Value: metric.Value,
LabelValues: newLabelsValues,
Group: metric.Group,
}
} else {
newCollection[hash] = c.collection[hash]
}
}
c.collection = newCollection
}
c.mtx.Unlock()
}

type ConstGaugeCollector struct {
mtx sync.RWMutex

name string
labelNames []string
desc *prometheus.Desc
collection map[uint64]GroupedGaugeMetric
}

func NewConstGaugeCollector(name string, labelNames []string) *ConstGaugeCollector {
desc := prometheus.NewDesc(name, name, labelNames, nil)
return &ConstGaugeCollector{
name: name,
labelNames: labelNames,
desc: desc,
collection: make(map[uint64]GroupedGaugeMetric),
}
}

func (c *ConstGaugeCollector) Set(group string, value float64, labels map[string]string) {
c.mtx.Lock()
defer c.mtx.Unlock()

labelValues := LabelValues(labels, c.labelNames)
labelsHash := HashLabelValues(labelValues)

storedMetric, ok := c.collection[labelsHash]
if !ok {
storedMetric = GroupedGaugeMetric{
Value: value,
LabelValues: labelValues,
Group: group,
}
}

storedMetric.Value = value
c.collection[labelsHash] = storedMetric
}

func (c *ConstGaugeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.desc
}

func (c *ConstGaugeCollector) Collect(ch chan<- prometheus.Metric) {
c.mtx.RLock()
defer c.mtx.RUnlock()

for _, s := range c.collection {
ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, s.Value, s.LabelValues...)
}
}

func (c *ConstGaugeCollector) Type() string {
return "gauge"
}

func (c *ConstGaugeCollector) LabelNames() []string {
return c.labelNames
}

func (c *ConstGaugeCollector) Name() string {
return c.name
}

// ExpireGroupMetrics deletes all metrics from collection with matched group.
func (c *ConstGaugeCollector) ExpireGroupMetrics(group string) {
c.mtx.Lock()
defer c.mtx.Unlock()

for hash, m := range c.collection {
if m.Group == group {
delete(c.collection, hash)
}
}
}

// UpdateLabels checks if any new labels are provided to the controller and updates its description, labelNames list and collection.
// The collection is recalculated in accordance with new label list.
func (c *ConstGaugeCollector) UpdateLabels(labels []string) {
c.mtx.Lock()
var mustUpdate bool
previousLabelsMap := make(map[string]int, len(c.labelNames))
for idx, label := range c.labelNames {
previousLabelsMap[label] = idx
}

previousLabelSet := make([]string, len(c.labelNames))
copy(previousLabelSet, c.labelNames)

for _, label := range labels {
if _, found := previousLabelsMap[label]; !found {
mustUpdate = true
c.labelNames = append(c.labelNames, label)
}
}
sort.Strings(c.labelNames)

if mustUpdate {
c.desc = prometheus.NewDesc(c.name, c.name, c.labelNames, nil)
newCollection := make(map[uint64]GroupedGaugeMetric)
for hash, metric := range c.collection {
if len(metric.LabelValues) != len(c.labelNames) {
newLabelsValues := make([]string, 0, len(c.labelNames))
for _, labelName := range c.labelNames {
if idx, found := previousLabelsMap[labelName]; found {
newLabelsValues = append(newLabelsValues, metric.LabelValues[idx])
} else {
newLabelsValues = append(newLabelsValues, "")
}
}
newLabelsHash := HashLabelValues(newLabelsValues)
newCollection[newLabelsHash] = GroupedGaugeMetric{
Value: metric.Value,
LabelValues: newLabelsValues,
Group: metric.Group,
}
} else {
newCollection[hash] = c.collection[hash]
}
}
c.collection = newCollection
}
c.mtx.Unlock()
}

const labelsSeparator = byte(255)

func HashLabelValues(labelValues []string) uint64 {
hasher := fnv.New64a()
for _, labelValue := range labelValues {
_, _ = hasher.Write([]byte(labelValue))
_, _ = hasher.Write([]byte{labelsSeparator})
}
return hasher.Sum64()
}
10 changes: 10 additions & 0 deletions pkg/metric/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package metric_test

import (
"github.com/flant/shell-operator/pkg/metric"
)

var (
_ metric.ConstCollector = (*metric.ConstCounterCollector)(nil)
_ metric.ConstCollector = (*metric.ConstGaugeCollector)(nil)
)
Loading

0 comments on commit 9a5cc26

Please sign in to comment.