Skip to content

Commit

Permalink
Refactor data stream collector
Browse files Browse the repository at this point in the history
- Move metric DESC to vars to aid in unused linter checks
- Use new Collector interface

Signed-off-by: Joe Adams <[email protected]>
  • Loading branch information
sysadmind committed Mar 5, 2025
1 parent 4301b8d commit f4f9bc3
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 147 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
BREAKING CHANGES:

The flag `--es.slm` has been renamed to `--collector.slm`.
The flag `--es.data_stream` has been renamed to `--collector.data-stream`.

The logging system has been replaced with log/slog from the stdlib. This change is being made across the prometheus ecosystem. The logging output has changed, but the messages and levels remain the same. The `ts` label for the timestamp has bewen replaced with `time`, the accuracy is less, and the timezone is not forced to UTC. The `caller` field has been replaced by the `source` field, which now includes the full path to the source file. The `level` field now exposes the log level in capital letters.

* [CHANGE] Rename --es.slm to --collector.slm #932
* [CHANGE] Rename --es.data_stream to --collector.data-stream #983
* [CHANGE] Replace logging system #942
* [ENHANCEMENT] Add external refresh stats #933

Expand Down
167 changes: 67 additions & 100 deletions collector/data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,141 +14,108 @@
package collector

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"path"

"github.com/prometheus/client_golang/prometheus"
)

type dataStreamMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(dataStreamStats DataStreamStatsDataStream) float64
Labels func(dataStreamStats DataStreamStatsDataStream) []string
}

var (
defaultDataStreamLabels = []string{"data_stream"}
defaultDataStreamLabelValues = func(dataStreamStats DataStreamStatsDataStream) []string {
return []string{dataStreamStats.DataStream}
}
dataStreamBackingIndicesTotal = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "data_stream", "backing_indices_total"),
"Number of backing indices",
[]string{"data_stream"},
nil,
)
dataStreamStoreSizeBytes = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "data_stream", "store_size_bytes"),
"Store size of data stream",
[]string{"data_stream"},
nil,
)
)

func init() {
registerCollector("data-stream", defaultDisabled, NewDataStream)
}

// DataStream Information Struct
type DataStream struct {
logger *slog.Logger
client *http.Client
url *url.URL

dataStreamMetrics []*dataStreamMetric
hc *http.Client
u *url.URL
}

// NewDataStream defines DataStream Prometheus metrics
func NewDataStream(logger *slog.Logger, client *http.Client, url *url.URL) *DataStream {
func NewDataStream(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) {
return &DataStream{
logger: logger,
client: client,
url: url,

dataStreamMetrics: []*dataStreamMetric{
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "data_stream", "backing_indices_total"),
"Number of backing indices",
defaultDataStreamLabels, nil,
),
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
return float64(dataStreamStats.BackingIndices)
},
Labels: defaultDataStreamLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "data_stream", "store_size_bytes"),
"Store size of data stream",
defaultDataStreamLabels, nil,
),
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
return float64(dataStreamStats.StoreSizeBytes)
},
Labels: defaultDataStreamLabelValues,
},
},
}
hc: hc,
u: u,
}, nil
}

// Describe adds DataStream metrics descriptions
func (ds *DataStream) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range ds.dataStreamMetrics {
ch <- metric.Desc
}
// DataStreamStatsResponse is a representation of the Data Stream stats
type DataStreamStatsResponse struct {
Shards DataStreamStatsShards `json:"_shards"`
DataStreamCount int64 `json:"data_stream_count"`
BackingIndices int64 `json:"backing_indices"`
TotalStoreSizeBytes int64 `json:"total_store_size_bytes"`
DataStreamStats []DataStreamStatsDataStream `json:"data_streams"`
}

func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse, error) {
var dsr DataStreamStatsResponse
// DataStreamStatsShards defines data stream stats shards information structure
type DataStreamStatsShards struct {
Total int64 `json:"total"`
Successful int64 `json:"successful"`
Failed int64 `json:"failed"`
}

u := *ds.url
u.Path = path.Join(u.Path, "/_data_stream/*/_stats")
res, err := ds.client.Get(u.String())
if err != nil {
return dsr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}
// DataStreamStatsDataStream defines the structure of per data stream stats
type DataStreamStatsDataStream struct {
DataStream string `json:"data_stream"`
BackingIndices int64 `json:"backing_indices"`
StoreSizeBytes int64 `json:"store_size_bytes"`
MaximumTimestamp int64 `json:"maximum_timestamp"`
}

defer func() {
err = res.Body.Close()
if err != nil {
ds.logger.Warn(
"failed to close http.Client",
"err", err,
)
}
}()

if res.StatusCode != http.StatusOK {
return dsr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}
func (ds *DataStream) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
var dsr DataStreamStatsResponse

u := ds.u.ResolveReference(&url.URL{Path: "/_data_stream/*/_stats"})

bts, err := io.ReadAll(res.Body)
resp, err := getURL(ctx, ds.hc, ds.logger, u.String())
if err != nil {
return dsr, err
return err
}

if err := json.Unmarshal(bts, &dsr); err != nil {
return dsr, err
if err := json.Unmarshal(resp, &dsr); err != nil {
return err
}

return dsr, nil
}
for _, dataStream := range dsr.DataStreamStats {
fmt.Printf("Metric: %+v", dataStream)

// Collect gets DataStream metric values
func (ds *DataStream) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
dataStreamBackingIndicesTotal,
prometheus.CounterValue,
float64(dataStream.BackingIndices),
dataStream.DataStream,
)

dataStreamStatsResp, err := ds.fetchAndDecodeDataStreamStats()
if err != nil {
ds.logger.Warn(
"failed to fetch and decode data stream stats",
"err", err,
ch <- prometheus.MustNewConstMetric(
dataStreamStoreSizeBytes,
prometheus.CounterValue,
float64(dataStream.StoreSizeBytes),
dataStream.DataStream,
)
return
}

for _, metric := range ds.dataStreamMetrics {
for _, dataStream := range dataStreamStatsResp.DataStreamStats {
fmt.Printf("Metric: %+v", dataStream)
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(dataStream),
metric.Labels(dataStream)...,
)
}
}

return nil

}
38 changes: 0 additions & 38 deletions collector/data_stream_response.go

This file was deleted.

4 changes: 2 additions & 2 deletions collector/data_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func TestDataStream(t *testing.T) {
t.Fatal(err)
}

c := NewDataStream(promslog.NewNopLogger(), http.DefaultClient, u)
c, err := NewDataStream(promslog.NewNopLogger(), u, http.DefaultClient)
if err != nil {
t.Fatal(err)
}

if err := testutil.CollectAndCompare(c, strings.NewReader(tt.want)); err != nil {
if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(tt.want)); err != nil {
t.Fatal(err)
}
})
Expand Down
7 changes: 0 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ func main() {
esExportShards = kingpin.Flag("es.shards",
"Export stats for shards in the cluster (implies --es.indices).").
Default("false").Bool()
esExportDataStream = kingpin.Flag("es.data_stream",
"Export stats for Data Streams.").
Default("false").Bool()
esClusterInfoInterval = kingpin.Flag("es.clusterinfo.interval",
"Cluster info update interval for the cluster label").
Default("5m").Duration()
Expand Down Expand Up @@ -217,10 +214,6 @@ func main() {
}
}

if *esExportDataStream {
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
}

if *esExportIndicesSettings {
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
}
Expand Down

0 comments on commit f4f9bc3

Please sign in to comment.