Skip to content

Commit

Permalink
chore: add views for host-metrics (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-luna authored Feb 9, 2024
1 parent 969b450 commit 22d32ae
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 7 deletions.
3 changes: 2 additions & 1 deletion packages/mockotlpserver/lib/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const {MockOtlpServer} = require('./mockotlpserver');
const {normalizeTrace} = require('./normalize');
const {normalizeTrace, normalizeMetrics} = require('./normalize');

module.exports = {
MockOtlpServer,
normalizeTrace,
normalizeMetrics,
};
245 changes: 245 additions & 0 deletions packages/opentelemetry-node/lib/metrics/host.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/**
* NOTICE: all this code below has the only purpose of provide a new `View`
* that does a simple aggregation (trim in this case) for `system.cpu.utilization`
* metric.
* This is too much code for a simple trim of data:
* - `@opentelemetry/sdk-metrics` should offer a easier way to do it and also
* should export all the necessary building blocks like `LastValueAccumulation`
* class and `AggregatorKind` enum
* - we might look for an alternate solution in this case (no proposal for now)
*/
/**
* @typedef {import('@opentelemetry/api').HrTime} HrTime
*/
/**
* @template T
* @typedef {import('@opentelemetry/sdk-metrics/build/src/aggregator/types').Aggregator<T>} Aggregator<T>
*/
/**
* @template T
* @typedef {import('@opentelemetry/sdk-metrics/build/src/aggregator/types').AccumulationRecord<T>} AccumulationRecord<T>
*/
/**
* @typedef {import('@opentelemetry/sdk-metrics').LastValueAggregation} LastValueAggregation
* @typedef {import('@opentelemetry/sdk-metrics/build/src/aggregator/types').Accumulation} Accumulation
* @typedef {import('@opentelemetry/sdk-metrics').MetricDescriptor} MetricDescriptor
* @typedef {import('@opentelemetry/sdk-metrics').AggregationTemporality} AggregationTemporality
* @typedef {import('@opentelemetry/sdk-metrics').GaugeMetricData} GaugeMetricData
*/

const {millisToHrTime, hrTimeToMicroseconds} = require('@opentelemetry/core');
const {
Aggregation,
DataPointType,
View,
} = require('@opentelemetry/sdk-metrics');
const {HostMetrics} = require('@opentelemetry/host-metrics');

/**
* Copied from `@opentelemetry/sdk-metrics` since it's not exported
* https://github.com/open-telemetry/opentelemetry-js/blob/f86251d40fbf615be87319c8a1f5643afb820076/packages/sdk-metrics/src/aggregator/LastValue.ts#L34
*
* @todo remoce this class and require it when exported
* @class
* @implements {Accumulation}
*/
class LastValueAccumulation {
/**
*
* @param {HrTime} startTime
* @param {number} [current]
* @param {HrTime} [sampleTime]
*/
constructor(startTime, current, sampleTime) {
this.startTime = startTime;
this._current = current || 0;
this.sampleTime = sampleTime || millisToHrTime(Date.now());
}

/**
* @param {number} value
*/
record(value) {
this._current = value;
this.sampleTime = millisToHrTime(Date.now());
}

/**
* @param {HrTime} startTime
*/
setStartTime(startTime) {
this.startTime = startTime;
}

/**
* @returns {number}
*/
toPointValue() {
return this._current;
}
}

/**
* @class
* @implements {Aggregator<LastValueAccumulation>}
*/
class SystemCpuUtilizationAggregator {
// TODO: hardcoded the value of `AggregatorKind` enum for GAUGE
// remove when exported
// https://github.com/open-telemetry/opentelemetry-js/blob/f86251d40fbf615be87319c8a1f5643afb820076/packages/sdk-metrics/src/aggregator/types.ts#L23
kind = 2;

/**
*
* @param {HrTime} startTime
* @returns
*/
createAccumulation(startTime) {
return new LastValueAccumulation(startTime);
}

/**
* Return the newly captured (delta) accumulation for SystemCpuUtilizationAggregator.
*
* @param {LastValueAccumulation} previous
* @param {LastValueAccumulation} delta
* @returns {LastValueAccumulation}
*/
merge(previous, delta) {
// nanoseconds may lose precisions.
const latestAccumulation =
hrTimeToMicroseconds(delta.sampleTime) >=
hrTimeToMicroseconds(previous.sampleTime)
? delta
: previous;
return new LastValueAccumulation(
previous.startTime,
latestAccumulation.toPointValue(),
latestAccumulation.sampleTime
);
}

/**
* A delta aggregation is not meaningful to SystemCpuUtilizationAggregator, just return
* the newly captured (delta) accumulation for SystemCpuUtilizationAggregator.
*
* @param {LastValueAccumulation} previous
* @param {LastValueAccumulation} current
* @returns {LastValueAccumulation}
*/
diff(previous, current) {
// nanoseconds may lose precisions.
const latestAccumulation =
hrTimeToMicroseconds(current.sampleTime) >=
hrTimeToMicroseconds(previous.sampleTime)
? current
: previous;
return new LastValueAccumulation(
current.startTime,
latestAccumulation.toPointValue(),
latestAccumulation.sampleTime
);
}

/**
* Does the sum of data points grouping by `system.cpu.logical_number` so we have the total
* utilization per CPU. Basically the value would be 1 - idle_value
* As an example given the data points:
* - { value: 0.1, attributes: { 'system.cpu.logical_number': 0, 'system.cpu.state': 'idle' } }
* - { value: 0.5, attributes: { 'system.cpu.logical_number': 0, 'system.cpu.state': 'system' } }
* - { value: 0.2, attributes: { 'system.cpu.logical_number': 0, 'system.cpu.state': 'user' } }
* - { value: 0.1, attributes: { 'system.cpu.logical_number': 0, 'system.cpu.state': 'nice' } }
* - { value: 0.1, attributes: { 'system.cpu.logical_number': 0, 'system.cpu.state': 'interrupt' } }
* - { value: 0.2, attributes: { 'system.cpu.logical_number': 1, 'system.cpu.state': 'idle' } }
* - { value: 0.4, attributes: { 'system.cpu.logical_number': 1, 'system.cpu.state': 'system' } }
* - { value: 0.1, attributes: { 'system.cpu.logical_number': 1, 'system.cpu.state': 'user' } }
* - { value: 0.1, attributes: { 'system.cpu.logical_number': 1, 'system.cpu.state': 'nice' } }
* - { value: 0.2, attributes: { 'system.cpu.logical_number': 1, 'system.cpu.state': 'interrupt' } }
*
* the aggregator will send
* - { value: 0.9, attributes: { 'system.cpu.logical_number': 0 } }
* - { value: 0.8, attributes: { 'system.cpu.logical_number': 1 } }
*
* @param {MetricDescriptor} descriptor
* @param {AggregationTemporality} aggregationTemporality
* @param {AccumulationRecord<LastValueAccumulation>[]} accumulationByAttributes
* @param {HrTime} endTime
* @returns {GaugeMetricData | undefined}
*/
toMetricData(
descriptor,
aggregationTemporality,
accumulationByAttributes,
endTime
) {
console.log('toMetricdata');
console.dir(accumulationByAttributes, {depth: 5});

// We cannot sum up the utilization of all the states since `os.cpus()` is
// not returning all of the possible states but limited to: user, nice, sys, idle, irq
// https://nodejs.org/api/all.html#all_os_oscpus
//
// where in linux we have more: user, nice, system, idle, iowait, irq, softirq, steal, guest, guest_nice
// https://man7.org/linux/man-pages/man5/proc.5.html
//
// So in order to have the most accurate metric of utilization we use
// the formula 1 - (idle utilization)
return {
descriptor,
aggregationTemporality,
dataPointType: DataPointType.GAUGE,
dataPoints: accumulationByAttributes
.filter(([attribs]) => attribs['system.cpu.state'] === 'idle')
.map(([attributes, accumulation]) => {
delete attributes['system.cpu.state'];
return {
attributes,
startTime: accumulation.startTime,
endTime,
value: 1 - accumulation.toPointValue(),
};
}),
};
}
}

class SystemCpuUtilizationAggregation extends Aggregation {
createAggregator(instrument) {
return new SystemCpuUtilizationAggregator();
}
}

/** @type {HostMetrics} */
let hostMetricsInstance;
function enableHostMetrics() {
// TODO: make this configurable, user might collect host metrics with a separate utility
hostMetricsInstance = new HostMetrics({
name: '',
});
hostMetricsInstance.start();
}

/** @type {View[]} */
const HOST_METRICS_VIEWS = [
// drop `system.network.*` metrics for now
new View({
instrumentName: 'system.network.*',
aggregation: Aggregation.Drop(),
}),
// drop `system.cpu.time` also
// TODO: check if we can do an aggregation here
new View({
instrumentName: 'system.cpu.time',
aggregation: Aggregation.Drop(),
}),
// use the aggregation we craeted above
new View({
instrumentName: 'system.cpu.utilization',
aggregation: new SystemCpuUtilizationAggregation(),
}),
];

module.exports = {
HOST_METRICS_VIEWS,
enableHostMetrics,
};
12 changes: 7 additions & 5 deletions packages/opentelemetry-node/lib/sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ const {HttpInstrumentation} = require('@opentelemetry/instrumentation-http');
const {
ExpressInstrumentation,
} = require('@opentelemetry/instrumentation-express');
const {HostMetrics} = require('@opentelemetry/host-metrics');
const {BatchLogRecordProcessor} = require('@opentelemetry/sdk-logs');

const {setupLogger} = require('./logging');
const {distroDetectorSync} = require('./detector');
const {setupEnvironment, restoreEnvironment} = require('./environment');

const {enableHostMetrics, HOST_METRICS_VIEWS} = require('./metrics/host');

/**
* @typedef {Partial<import('@opentelemetry/sdk-node').NodeSDKConfiguration>} PartialNodeSDKConfiguration
*/
Expand Down Expand Up @@ -80,6 +81,10 @@ class ElasticNodeSDK extends NodeSDK {
exportIntervalMillis: metricsInterval,
exportTimeoutMillis: metricsInterval, // TODO same val appropriate for timeout?
});
defaultConfig.views = [
// Add views for `host-metrics` to avoid excess of data being sent to the server
...HOST_METRICS_VIEWS,
];
}

const configuration = Object.assign(defaultConfig, opts);
Expand All @@ -102,10 +107,7 @@ class ElasticNodeSDK extends NodeSDK {

if (!this._metricsDisabled) {
// TODO: make this configurable, user might collect host metrics with a separate utility
this._hostMetrics = new HostMetrics({
name: '', // Use empty string to satisfy types, but get default.
});
this._hostMetrics.start();
enableHostMetrics();
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions packages/opentelemetry-node/test/fixtures/use-host-metrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const {createHash} = require('crypto');

const exportInterval = process.env.ETEL_METRICS_INTERVAL_MS;
const exportTime = Date.now() + exportInterval * 1.5;

// Do some operations to get some CPU usage
const timerId = setInterval(() => {
if (exportTime < Date.now()) {
clearInterval(timerId);
return;
}

const hash = createHash('sha256');
hash.update(new Array(100).fill(Math.random()).join(','));
console.log(hash.digest('hex'));
}, 10);
77 changes: 77 additions & 0 deletions packages/opentelemetry-node/test/host-metrics.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Test that the view set by ElasticNodeSdk work as expected
// - dropping data points for `system.network.*` metrics
// - dropping data points for `system.cpu.time` metric
// - agreggating data points for `system.cpu.utilization` metric

const {test} = require('tape');
const {runTestFixtures} = require('./testutils');

/** @type {import('./testutils').TestFixture[]} */
const testFixtures = [
{
name: 'host metrics default views',
args: ['./fixtures/use-host-metrics.js'],
cwd: __dirname,
env: {
NODE_OPTIONS: '--require=../start.js',
// TODO: this shall change in the future
// we give a small number so we export the metrics while the
// server is waiting/processing requests
ETEL_METRICS_INTERVAL_MS: 100,
},
checkTelemetry: (t, collector) => {
const metrics = collector.metrics;
const networkMetrics = metrics.filter((metric) =>
metric.name.startsWith('system.network')
);
const cpuTimeMetrics = metrics.filter(
(metric) => metric.name === 'system.cpu.time'
);
const cpuUtilizationMetrics = metrics.filter(
(metric) => metric.name === 'system.cpu.utilization'
);

t.ok(
networkMetrics.length === 0,
'system.network.* metrics are dropped'
);
t.ok(
cpuTimeMetrics.length === 0,
'system.cpu.time metric is dropped'
);
cpuUtilizationMetrics.forEach((metric) => {
t.ok(
metric.gauge,
'data points are present in system.cpu.utilization metric'
);
t.ok(
metric.gauge?.dataPoints.every((dp) => dp.asDouble < 1),
'"system.cpu.utilization" data points have a value between 0-1'
);
t.ok(
metric.gauge?.dataPoints.filter(
(dp) =>
dp.attributes && dp.attributes['system.cpu.state']
),
'data points have no "system.cpu.state" attribute'
);
t.equal(
new Set(
metric.gauge?.dataPoints.map(
(dp) =>
dp.attributes &&
dp.attributes['system.cpu.logical_number']
)
).size,
metric.gauge?.dataPoints.length,
'data points have different "system.cpu.logical_number"'
);
});
},
},
];

test('host metrics', (suite) => {
runTestFixtures(suite, testFixtures);
suite.end();
});
Loading

0 comments on commit 22d32ae

Please sign in to comment.