Skip to content

Commit

Permalink
Support importing cumulative metrics (fix google#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
knyar committed Nov 23, 2018
1 parent 64204b9 commit 05f48ab
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 100 deletions.
66 changes: 63 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,11 @@ metric:
* `destination`: name of the Stackdriver destination that query result will be
written to. Destinations need to be explicitly listed in the
`stackdriver_destinations` section of the configuration file.
* `cumulative`: a boolean flag describing whether query result should be
imported as a cumulative metric (a monotonically increasing counter). See
[Cumulative metrics](#cumulative-metrics) section below for more details.
All parameters are required.
All parameters are required, except for `cumulative` (that defaults to `false`).
Please keep in mind the following details about Datadog API:
Expand All @@ -201,7 +204,7 @@ Please keep in mind the following details about Datadog API:
[alignment period](https://cloud.google.com/monitoring/charts/metrics-selector#alignment)
shorter than 1 minute.
## Metric Targets
## Metric Destinations
### Stackdriver
Expand Down Expand Up @@ -262,14 +265,71 @@ the `env_variables` section of `app/app.yaml`.
high might result in the App Engine instance running out of RAM.
* `DATADOG_MIN_POINT_AGE`: minimum age of a data point returned by Datadog
that makes it eligible for being written. Points that are very fresh
(default is 1 minute) are ignored, since Datadog might return incomplete
(default is 1.5 minutes) are ignored, since Datadog might return incomplete
data for them if some input data is delayed.
* `DATADOG_COUNTER_RESET_INTERVAL`: while importing counters, ts-bridge needs
to reset 'start time' regularly to keep the query time window small enough
to avoid [aggregation](https://docs.datadoghq.com/graphing/faq/what-is-the-granularity-of-my-graphs-am-i-seeing-raw-data-or-aggregates-on-my-graph/)
on Datadog side. This parameter defines how often a new start time is
chosen. 30 minutes should be sufficient for metrics that have a point
every 10 seconds. See [Cumulative metrics](#cumulative-metrics) section
below for more details.
* `ENABLE_STATUS_PAGE`: can be set to 'yes' to enable the status web page
(disabled by default).
You can use `--env_var` flag to override these environment variables while
running the app via `dev_appserver.py`.
# Cumulative metrics
Stackdriver supports cumulative metrics, which are monotonically increasing
counters. Such metrics allow calculating deltas and rates over different
[alignment periods](https://cloud.google.com/monitoring/custom-metrics/reading-metrics#aligning).
While Datadog does not have first-class support for cumulative metrics, it
is possible to use the
[cumsum](https://docs.datadoghq.com/graphing/functions/arithmetic/#cumulative-sum)
query function to retreive a cumulative sum. Time Series Bridge can use
result of such queries and import them as cumulative metrics, but such
queries need to be explicitly annotated with a `cumulative` option in
`metrics.yaml` being set to `true`.
For queries that are marked as `cumulative`, ts-bridge will regularly
choose a 'start time' and then issue queries with that time passed in
the [from](https://docs.datadoghq.com/api/?lang=python#query-timeseries-points)
API parameter. As the result, Datadog will return a monotonically
increasing time series with a sum of all measurements since 'start time'.
To avoid aggregation of multiple points into one on Datadog side,
'start time' regularly gets moved forward, keeping the query time window
short (see `DATADOG_COUNTER_RESET_INTERVAL`).
Such resets are handled correctly by Stackdriver, since it requires
explicit start time to be provided for cumulative metric points.
Often, for Datadog to provide a cumulative sum of all measurements,
the `.as_count()` suffix needs to be appended to metric name. Otherwise
measurements might be provided as per-second rates rather than exact counts.
For metrics that have measurements more often than every minute, you might
also want to append the `.rollup()` function as
[described below](#writing-points-to-stackdriver-too-frequently).
For example, to import the counter metric called `http_requests` as a
cumulative metric to Stackdriver, you might configure the following query in
ts-bridge (and set `cumulative` to `true`):
cumsum(sum:http_requests{*}.as_count().rollup(sum, 60))
To unpack this:
* `cumsum()` makes Datadog return a cumulative sum of measurements;
* `sum:` prefix ensures that sum is used as the aggregation method if there
are multiple time series with the same metric name but different tags
(for example, reported from different machines);
* `.as_count()` suffix gathers actual measurements rather than per-second
rates;
* `.rollup(sum, 60)` aggregates values into 60-second intervals in case
there are multiple measurements for this metric reported per minute.
# Status Page
If the `ENABLE_STATUS_PAGE` environment variable is set to 'yes', the index page
Expand Down
6 changes: 5 additions & 1 deletion app/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ env_variables:
UPDATE_PARALLELISM: 1
# Points received from Datadog that are too fresh will be discarded to allow data to be fully processed and aggregated
# on Datadog side. This variable defines the threshold for "too fresh".
DATADOG_MIN_POINT_AGE: "1m"
DATADOG_MIN_POINT_AGE: "90s"
# While importing counters, ts-bridge needs to reset 'start time' regularly to keep the query time window small enough
# to avoid aggregation. This parameter defines how often a new start time is chosen. 30min should be sufficient for
# metrics that have a point every 10 seconds.
DATADOG_COUNTER_RESET_INTERVAL: "30m"
# Uncomment to enable the status web page.
#ENABLE_STATUS_PAGE: "yes"

Expand Down
11 changes: 10 additions & 1 deletion app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,16 @@ func newConfig(ctx context.Context) (*tsbridge.Config, error) {
return nil, fmt.Errorf("Could not parse DATADOG_MIN_POINT_AGE: %v", err)
}

return tsbridge.NewConfig(ctx, os.Getenv("CONFIG_FILE"), ddMinPointAge)
ddResetInterval, err := time.ParseDuration(os.Getenv("DATADOG_COUNTER_RESET_INTERVAL"))
if err != nil {
return nil, fmt.Errorf("Could not parse DATADOG_COUNTER_RESET_INTERVAL: %v", err)
}

return tsbridge.NewConfig(ctx, &tsbridge.ConfigOptions{
Filename: os.Getenv("CONFIG_FILE"),
DatadogMinPointAge: ddMinPointAge,
DatadogCounterResetInterval: ddResetInterval,
})
}

// Since some URLs are triggered by App Engine cron, error messages returned in HTTP response
Expand Down
135 changes: 106 additions & 29 deletions datadog/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package datadog
import (
"context"
"fmt"
"strings"
"time"

"github.com/google/ts-bridge/record"
Expand All @@ -33,28 +34,35 @@ import (

// Metric defines a Datadog-based metric. It implements the SourceMetric interface.
type Metric struct {
Name string
config *MetricConfig
client *ddapi.Client
minPointAge time.Duration
Name string
config *MetricConfig
client *ddapi.Client
minPointAge time.Duration
counterResetInterval time.Duration
}

// MetricConfig defines configuration file parameters for a specific metric imported from Datadog.
type MetricConfig struct {
APIKey string `yaml:"api_key" validate:"nonzero"`
ApplicationKey string `yaml:"application_key" validate:"nonzero"`
Query string `validate:"nonzero"`
Cumulative bool
}

// NewSourceMetric creates a new SourceMetric from a metric name and configuration parameters.
func NewSourceMetric(name string, config *MetricConfig, minPointAge time.Duration) *Metric {
func NewSourceMetric(name string, config *MetricConfig, minPointAge, counterResetInterval time.Duration) (*Metric, error) {
if config.Cumulative && !strings.Contains(config.Query, "cumsum") {
return nil, fmt.Errorf("Query for the cumulative metric %s does not contain the cumsum Datadog function", name)
}

client := ddapi.NewClient(config.APIKey, config.ApplicationKey)
return &Metric{
Name: name,
config: config,
client: client,
minPointAge: minPointAge,
}
Name: name,
config: config,
client: client,
minPointAge: minPointAge,
counterResetInterval: counterResetInterval,
}, nil
}

// StackdriverName returns the full Stackdriver metric name (also called "metric type") for this metric.
Expand All @@ -68,11 +76,21 @@ func (m *Metric) Query() string {
}

// StackdriverData issues a Datadog query, returning metric descriptor and time series data.
// Time series data will include points since the given timestamp.
func (m *Metric) StackdriverData(ctx context.Context, since time.Time, record *record.MetricRecord) (*metricpb.MetricDescriptor, []*monitoringpb.TimeSeries, error) {
// Time series data will include points after the given lastPoint timestamp.
func (m *Metric) StackdriverData(ctx context.Context, lastPoint time.Time, rec record.MetricRecord) (*metricpb.MetricDescriptor, []*monitoringpb.TimeSeries, error) {
m.client.HttpClient = urlfetch.Client(ctx)

// Datadog's `from` parameter is inclusive, so we set it to 1 second after the latest point we've got.
series, err := m.client.QueryMetrics(since.Unix()+1, time.Now().Unix(), m.config.Query)
from := lastPoint.Add(time.Second)
if m.config.Cumulative {
var err error
from, err = m.counterStartTime(ctx, lastPoint, rec)
if err != nil {
return nil, nil, err
}
}

series, err := m.client.QueryMetrics(from.Unix(), time.Now().Unix(), m.config.Query)
if err != nil {
return nil, nil, err
}
Expand All @@ -82,34 +100,91 @@ func (m *Metric) StackdriverData(ctx context.Context, since time.Time, record *r
} else if len(series) > 1 {
return nil, nil, fmt.Errorf("Query '%s' returned %d time series", m.config.Query, len(series))
}
points, err := m.filterPoints(series[0].Points)

points, err := m.filterPoints(lastPoint, series[0].Points)
log.Debugf(ctx, "Got %d points (%d after filtering) in response to the Datadog query '%s'", len(series[0].Points), len(points), m.config.Query)
return m.metricDescriptor(series[0]), m.convertTimeSeries(points), nil

startTime, err := ptypes.TimestampProto(from)
if err != nil {
return nil, nil, fmt.Errorf("Count not convert timestamp %v to proto: %v", from, err)
}
return m.metricDescriptor(series[0]), m.convertTimeSeries(startTime, points), nil
}

// filterPoints gets a slice of Datadog points, and returns a similar slice, but without points that are too fresh.
func (m *Metric) filterPoints(points []ddapi.DataPoint) ([]ddapi.DataPoint, error) {
// counterStartTime returns the start time for a cumulative metric. It's used as
// the `from` parameter while issuing Datadog queries, and also as the `start
// time` field in points reported for this cumulative metric to SD.
func (m *Metric) counterStartTime(ctx context.Context, lastPoint time.Time, rec record.MetricRecord) (time.Time, error) {
// Start time needs to be reset regularly, since otherwise we will be querying
// Datadog for a time window large enough for aggregation to kick in.
if time.Now().Sub(rec.GetCounterStartTime()) > m.counterResetInterval {
var start time.Time
if time.Now().Sub(lastPoint) <= m.counterResetInterval {
// This is the common case: choose the new start time based on the last point
// timestamp. This ensures continuity of data.
// Datadog's timestamps have 1-second granularity, and timestamp X covers
// data between X and X+1s, so we increment last point timestamp by 1 second
// while choosing a new start time.
start = lastPoint.Add(time.Second)
} else {
// This is the rare case: when last point is too old, we cannot use it as a
// basis for new start time, since it will make new start time still older
// than ResetInterval, and it will immediately need to be moved forward
// again. This only happens when a new metric is added, or when writes to
// Stackdriver have been failing for more than ResetInterval.
// We need to choose an arbitrary point in the recent past as the new start
// time, and we select half of the reset interval: this ensures that we
// backfill some data, but won't need to reset the start time again for a
// while.
start = time.Now().Add(-m.counterResetInterval / 2)
}
if err := rec.SetCounterStartTime(ctx, start); err != nil {
return time.Time{}, fmt.Errorf("Could not set counter start time: %v", err)
}
log.Infof(ctx, "Counter start time for %s has been reset to %v", m.Name, start)
}
return rec.GetCounterStartTime(), nil
}

// filterPoints gets a slice of Datadog points, and returns a similar slice, but without points that are too fresh or
// too old.
func (m *Metric) filterPoints(lastPoint time.Time, points []ddapi.DataPoint) ([]ddapi.DataPoint, error) {
var output []ddapi.DataPoint
for _, p := range points {
ts, err := ptypes.Timestamp(pointTimestamp(p))
if err != nil {
return nil, fmt.Errorf("Could not parse point timestamp for %v: %v", p, err)
}
if time.Now().Sub(ts) >= m.minPointAge {
output = append(output, p)
// Ignore points that are too fresh, since they might contain incomplete data.
if time.Now().Sub(ts) < m.minPointAge {
continue
}
// Ignore points that are equal to or older than the last written point. For gauge metrics this is a noop,
// since we only query Datadog for new points, but for cumulative metrics this is where we discard already
// written data (we still need to pull it from Datadog for it to return us a cumulative sum).
if lastPoint.Sub(ts) >= 0 {
continue
}
output = append(output, p)
}
return output, nil
}

// sdMetricKind returns Stackdriver metric kind for this metric.
func (m *Metric) metricKind() metricpb.MetricDescriptor_MetricKind {
if m.config.Cumulative {
return metricpb.MetricDescriptor_CUMULATIVE
}
return metricpb.MetricDescriptor_GAUGE
}

// metricDescriptor creates a Stackdriver MetricDescriptor based on a Datadog series.
func (m *Metric) metricDescriptor(series ddapi.Series) *metricpb.MetricDescriptor {
d := &metricpb.MetricDescriptor{
// Name does not need to be set here; it will be set by Stackdriver Adapter based on the Stackdriver
// project that this metric is written to.
Type: m.StackdriverName(),
// Query results are gauges; there does not seem to be a way to get cumulative metrics from Datadog.
MetricKind: metricpb.MetricDescriptor_GAUGE,
Type: m.StackdriverName(),
MetricKind: m.metricKind(),
// Datadog API does not declare value type, and the client library exposes all points as float64.
ValueType: metricpb.MetricDescriptor_DOUBLE,
Description: fmt.Sprintf("Datadog query: %s", m.config.Query),
Expand All @@ -131,26 +206,28 @@ func (m *Metric) metricDescriptor(series ddapi.Series) *metricpb.MetricDescripto
// A separate TimeSeries message is created for each point because Stackdriver only allows sending a single
// point in a given request for each time series, so multiple points will need to be sent as separate requests.
// See https://cloud.google.com/monitoring/custom-metrics/creating-metrics#writing-ts
func (m *Metric) convertTimeSeries(points []ddapi.DataPoint) []*monitoringpb.TimeSeries {
func (m *Metric) convertTimeSeries(start *timestamp.Timestamp, points []ddapi.DataPoint) []*monitoringpb.TimeSeries {
ts := make([]*monitoringpb.TimeSeries, 0, len(points))
for _, p := range points {
ts = append(ts, &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{Type: m.StackdriverName()},
Resource: &monitoredres.MonitoredResource{Type: "global"},
MetricKind: metricpb.MetricDescriptor_GAUGE,
MetricKind: m.metricKind(),
ValueType: metricpb.MetricDescriptor_DOUBLE,
Points: []*monitoringpb.Point{convertPoint(p)},
Points: []*monitoringpb.Point{m.convertPoint(start, p)},
})
}
return ts
}

// convertPoint converts a Datadog point into a Stackdriver point.
func convertPoint(p ddapi.DataPoint) *monitoringpb.Point {
func (m *Metric) convertPoint(start *timestamp.Timestamp, p ddapi.DataPoint) *monitoringpb.Point {
i := &monitoringpb.TimeInterval{EndTime: pointTimestamp(p)}
if m.config.Cumulative {
i.StartTime = start
}
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: pointTimestamp(p),
},
Interval: i,
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: *p[1],
Expand Down
Loading

0 comments on commit 05f48ab

Please sign in to comment.