Skip to content

Commit

Permalink
feat: Add server runtime metrics aggregator (#9894)
Browse files Browse the repository at this point in the history
## Implements

- [x] 10 Second Bucketing: SDKs are required to bucket into 10 second
intervals (rollup in seconds) which is the current lower bound of metric
accuracy.
- [x] Flush Shift: SDKs are required to shift the flush interval by
random() * rollup_in_seconds. That shift is determined once per startup
to create jittering.
- [ ] Force flush: an SDK is required to perform force flushing ahead of
scheduled time if the memory pressure is too high. There is no rule for
this other than that SDKs should be tracking abstract aggregation
complexity (eg: a counter only carries a single float, whereas a
distribution is a float per emission).

## Caveats 

- Force flush requires Node.js 14+ support (FinalizationRegistry). I
recommend leaving it after v8 release to make the implementation a lot
easier.

---------

Co-authored-by: Abhijeet Prasad <[email protected]>
  • Loading branch information
anonrig and AbhiPrasad authored Dec 21, 2023
1 parent 5d16aae commit 7f8eca7
Show file tree
Hide file tree
Showing 20 changed files with 540 additions and 137 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
"clean:deps": "lerna clean --yes && rm -rf node_modules && yarn",
"clean:all": "run-s clean:build clean:caches clean:deps",
"codecov": "codecov",
"fix": "run-s fix:lerna fix:biome",
"fix": "run-p fix:lerna fix:biome",
"fix:lerna": "lerna run fix",
"fix:biome": "biome check --apply .",
"changelog": "ts-node ./scripts/get-commit-list.ts",
"link:yarn": "lerna exec yarn link",
"lint": "run-s lint:lerna lint:biome",
"lint": "run-p lint:lerna lint:biome",
"lint:lerna": "lerna run lint",
"lint:biome": "biome check .",
"validate:es5": "lerna run validate:es5",
Expand Down
1 change: 1 addition & 0 deletions packages/bun/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export {
startInactiveSpan,
startSpanManual,
continueTrace,
metrics,
} from '@sentry/core';
export type { SpanStatusType } from '@sentry/core';
export { autoDiscoverNodePerformanceMonitoringIntegrations } from '@sentry/node';
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ export abstract class BaseClient<O extends ClientOptions> implements Client<O> {
* @inheritDoc
*/
public captureAggregateMetrics(metricBucketItems: Array<MetricBucketItem>): void {
DEBUG_BUILD && logger.log(`Flushing aggregated metrics, number of metrics: ${metricBucketItems.length}`);
const metricsEnvelope = createMetricEnvelope(
metricBucketItems,
this._dsn,
Expand Down
163 changes: 163 additions & 0 deletions packages/core/src/metrics/aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import type {
Client,
ClientOptions,
MeasurementUnit,
MetricsAggregator as MetricsAggregatorBase,
Primitive,
} from '@sentry/types';
import { timestampInSeconds } from '@sentry/utils';
import { DEFAULT_FLUSH_INTERVAL, MAX_WEIGHT, NAME_AND_TAG_KEY_NORMALIZATION_REGEX } from './constants';
import { METRIC_MAP } from './instance';
import type { MetricBucket, MetricType } from './types';
import { getBucketKey, sanitizeTags } from './utils';

/**
* A metrics aggregator that aggregates metrics in memory and flushes them periodically.
*/
export class MetricsAggregator implements MetricsAggregatorBase {
// TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets
// when the aggregator is garbage collected.
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
private _buckets: MetricBucket;

// Different metrics have different weights. We use this to limit the number of metrics
// that we store in memory.
private _bucketsTotalWeight;

private readonly _interval: ReturnType<typeof setInterval>;

// SDKs are required to shift the flush interval by random() * rollup_in_seconds.
// That shift is determined once per startup to create jittering.
private readonly _flushShift: number;

// An SDK is required to perform force flushing ahead of scheduled time if the memory
// pressure is too high. There is no rule for this other than that SDKs should be tracking
// abstract aggregation complexity (eg: a counter only carries a single float, whereas a
// distribution is a float per emission).
//
// Force flush is used on either shutdown, flush() or when we exceed the max weight.
private _forceFlush: boolean;

public constructor(private readonly _client: Client<ClientOptions>) {
this._buckets = new Map();
this._bucketsTotalWeight = 0;
this._interval = setInterval(() => this._flush(), DEFAULT_FLUSH_INTERVAL);
this._flushShift = Math.floor((Math.random() * DEFAULT_FLUSH_INTERVAL) / 1000);
this._forceFlush = false;
}

/**
* @inheritDoc
*/
public add(
metricType: MetricType,
unsanitizedName: string,
value: number | string,
unit: MeasurementUnit = 'none',
unsanitizedTags: Record<string, Primitive> = {},
maybeFloatTimestamp = timestampInSeconds(),
): void {
const timestamp = Math.floor(maybeFloatTimestamp);
const name = unsanitizedName.replace(NAME_AND_TAG_KEY_NORMALIZATION_REGEX, '_');
const tags = sanitizeTags(unsanitizedTags);

const bucketKey = getBucketKey(metricType, name, unit, tags);
let bucketItem = this._buckets.get(bucketKey);
if (bucketItem) {
bucketItem.metric.add(value);
// TODO(abhi): Do we need this check?
if (bucketItem.timestamp < timestamp) {
bucketItem.timestamp = timestamp;
}
} else {
bucketItem = {
// @ts-expect-error we don't need to narrow down the type of value here, saves bundle size.
metric: new METRIC_MAP[metricType](value),
timestamp,
metricType,
name,
unit,
tags,
};
this._buckets.set(bucketKey, bucketItem);
}

// We need to keep track of the total weight of the buckets so that we can
// flush them when we exceed the max weight.
this._bucketsTotalWeight += bucketItem.metric.weight;

if (this._bucketsTotalWeight >= MAX_WEIGHT) {
this.flush();
}
}

/**
* Flushes the current metrics to the transport via the transport.
*/
public flush(): void {
this._forceFlush = true;
this._flush();
}

/**
* Shuts down metrics aggregator and clears all metrics.
*/
public close(): void {
this._forceFlush = true;
clearInterval(this._interval);
this._flush();
}

/**
* Flushes the buckets according to the internal state of the aggregator.
* If it is a force flush, which happens on shutdown, it will flush all buckets.
* Otherwise, it will only flush buckets that are older than the flush interval,
* and according to the flush shift.
*
* This function mutates `_forceFlush` and `_bucketsTotalWeight` properties.
*/
private _flush(): void {
// TODO(@anonrig): Add Atomics for locking to avoid having force flush and regular flush
// running at the same time.
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics

// This path eliminates the need for checking for timestamps since we're forcing a flush.
// Remember to reset the flag, or it will always flush all metrics.
if (this._forceFlush) {
this._forceFlush = false;
this._bucketsTotalWeight = 0;
this._captureMetrics(this._buckets);
this._buckets.clear();
return;
}
const cutoffSeconds = Math.floor(timestampInSeconds()) - DEFAULT_FLUSH_INTERVAL / 1000 - this._flushShift;
// TODO(@anonrig): Optimization opportunity.
// Convert this map to an array and store key in the bucketItem.
const flushedBuckets: MetricBucket = new Map();
for (const [key, bucket] of this._buckets) {
if (bucket.timestamp <= cutoffSeconds) {
flushedBuckets.set(key, bucket);
this._bucketsTotalWeight -= bucket.metric.weight;
}
}

for (const [key] of flushedBuckets) {
this._buckets.delete(key);
}

this._captureMetrics(flushedBuckets);
}

/**
* Only captures a subset of the buckets passed to this function.
* @param flushedBuckets
*/
private _captureMetrics(flushedBuckets: MetricBucket): void {
if (flushedBuckets.size > 0 && this._client.captureAggregateMetrics) {
// TODO(@anonrig): Optimization opportunity.
// This copy operation can be avoided if we store the key in the bucketItem.
const buckets = Array.from(flushedBuckets).map(([, bucketItem]) => bucketItem);
this._client.captureAggregateMetrics(buckets);
}
}
}
92 changes: 92 additions & 0 deletions packages/core/src/metrics/browser-aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import type {
Client,
ClientOptions,
MeasurementUnit,
MetricBucketItem,
MetricsAggregator,
Primitive,
} from '@sentry/types';
import { timestampInSeconds } from '@sentry/utils';
import { DEFAULT_BROWSER_FLUSH_INTERVAL, NAME_AND_TAG_KEY_NORMALIZATION_REGEX } from './constants';
import { METRIC_MAP } from './instance';
import type { MetricBucket, MetricType } from './types';
import { getBucketKey, sanitizeTags } from './utils';

/**
* A simple metrics aggregator that aggregates metrics in memory and flushes them periodically.
* Default flush interval is 5 seconds.
*
* @experimental This API is experimental and might change in the future.
*/
export class BrowserMetricsAggregator implements MetricsAggregator {
// TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets
// when the aggregator is garbage collected.
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
private _buckets: MetricBucket;
private readonly _interval: ReturnType<typeof setInterval>;

public constructor(private readonly _client: Client<ClientOptions>) {
this._buckets = new Map();
this._interval = setInterval(() => this.flush(), DEFAULT_BROWSER_FLUSH_INTERVAL);
}

/**
* @inheritDoc
*/
public add(
metricType: MetricType,
unsanitizedName: string,
value: number | string,
unit: MeasurementUnit | undefined = 'none',
unsanitizedTags: Record<string, Primitive> | undefined = {},
maybeFloatTimestamp: number | undefined = timestampInSeconds(),
): void {
const timestamp = Math.floor(maybeFloatTimestamp);
const name = unsanitizedName.replace(NAME_AND_TAG_KEY_NORMALIZATION_REGEX, '_');
const tags = sanitizeTags(unsanitizedTags);

const bucketKey = getBucketKey(metricType, name, unit, tags);
const bucketItem: MetricBucketItem | undefined = this._buckets.get(bucketKey);
if (bucketItem) {
bucketItem.metric.add(value);
// TODO(abhi): Do we need this check?
if (bucketItem.timestamp < timestamp) {
bucketItem.timestamp = timestamp;
}
} else {
this._buckets.set(bucketKey, {
// @ts-expect-error we don't need to narrow down the type of value here, saves bundle size.
metric: new METRIC_MAP[metricType](value),
timestamp,
metricType,
name,
unit,
tags,
});
}
}

/**
* @inheritDoc
*/
public flush(): void {
// short circuit if buckets are empty.
if (this._buckets.size === 0) {
return;
}
if (this._client.captureAggregateMetrics) {
// TODO(@anonrig): Use Object.values() when we support ES6+
const metricBuckets = Array.from(this._buckets).map(([, bucketItem]) => bucketItem);
this._client.captureAggregateMetrics(metricBuckets);
}
this._buckets.clear();
}

/**
* @inheritDoc
*/
public close(): void {
clearInterval(this._interval);
this.flush();
}
}
13 changes: 12 additions & 1 deletion packages/core/src/metrics/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,15 @@ export const TAG_VALUE_NORMALIZATION_REGEX = /[^\w\d_:/@.{}[\]$-]+/g;
* This does not match spec in https://develop.sentry.dev/sdk/metrics
* but was chosen to optimize for the most common case in browser environments.
*/
export const DEFAULT_FLUSH_INTERVAL = 5000;
export const DEFAULT_BROWSER_FLUSH_INTERVAL = 5000;

/**
* SDKs are required to bucket into 10 second intervals (rollup in seconds)
* which is the current lower bound of metric accuracy.
*/
export const DEFAULT_FLUSH_INTERVAL = 10000;

/**
* The maximum number of metrics that should be stored in memory.
*/
export const MAX_WEIGHT = 10000;
2 changes: 1 addition & 1 deletion packages/core/src/metrics/envelope.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function createMetricEnvelope(
return createEnvelope<StatsdEnvelope>(headers, [item]);
}

function createMetricEnvelopeItem(metricBucketItems: Array<MetricBucketItem>): StatsdItem {
function createMetricEnvelopeItem(metricBucketItems: MetricBucketItem[]): StatsdItem {
const payload = serializeMetricBuckets(metricBucketItems);
const metricHeaders: StatsdItem[0] = {
type: 'statsd',
Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/metrics/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function addToMetricsAggregator(
metricType: MetricType,
name: string,
value: number | string,
data: MetricData = {},
data: MetricData | undefined = {},
): void {
const client = getClient<BaseClient<ClientOptions>>();
const scope = getCurrentScope();
Expand Down Expand Up @@ -49,7 +49,7 @@ function addToMetricsAggregator(
/**
* Adds a value to a counter metric
*
* @experimental This API is experimental and might having breaking changes in the future.
* @experimental This API is experimental and might have breaking changes in the future.
*/
export function increment(name: string, value: number = 1, data?: MetricData): void {
addToMetricsAggregator(COUNTER_METRIC_TYPE, name, value, data);
Expand All @@ -58,7 +58,7 @@ export function increment(name: string, value: number = 1, data?: MetricData): v
/**
* Adds a value to a distribution metric
*
* @experimental This API is experimental and might having breaking changes in the future.
* @experimental This API is experimental and might have breaking changes in the future.
*/
export function distribution(name: string, value: number, data?: MetricData): void {
addToMetricsAggregator(DISTRIBUTION_METRIC_TYPE, name, value, data);
Expand All @@ -67,7 +67,7 @@ export function distribution(name: string, value: number, data?: MetricData): vo
/**
* Adds a value to a set metric. Value must be a string or integer.
*
* @experimental This API is experimental and might having breaking changes in the future.
* @experimental This API is experimental and might have breaking changes in the future.
*/
export function set(name: string, value: number | string, data?: MetricData): void {
addToMetricsAggregator(SET_METRIC_TYPE, name, value, data);
Expand All @@ -76,7 +76,7 @@ export function set(name: string, value: number | string, data?: MetricData): vo
/**
* Adds a value to a gauge metric
*
* @experimental This API is experimental and might having breaking changes in the future.
* @experimental This API is experimental and might have breaking changes in the future.
*/
export function gauge(name: string, value: number, data?: MetricData): void {
addToMetricsAggregator(GAUGE_METRIC_TYPE, name, value, data);
Expand Down
Loading

0 comments on commit 7f8eca7

Please sign in to comment.