Skip to content

Commit

Permalink
KAFKA-15972: Add support to exclude labels for telemetry metrics (KIP…
Browse files Browse the repository at this point in the history
…-714) (apache#14924)

Changes in the PR are to support excluding client_id label when sending telemetry metrics.

Some of the labels/tags which are present in metric should be skipped while collecting telemetry as data might already be known to broker hence, we should minimize the data transfer. One of such labels is client_id which is already present in RequestContext hence broker can append that label prior emitting metrics to telemetry backend.

Reviewers: Andrew Schofield <[email protected]>, Walker Carlson <[email protected]>
  • Loading branch information
apoorvmittal10 authored Dec 6, 2023
1 parent a89b648 commit 71b4cba
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,6 +104,13 @@
public class ClientTelemetryReporter implements MetricsReporter {

private static final Logger log = LoggerFactory.getLogger(ClientTelemetryReporter.class);
/*
Exclude client_id from the labels as the broker already knows the client_id from the request
context. These additional labels from the request context should be added by broker prior
exporting the metrics to the telemetry backend.
*/
private static final Set<String> EXCLUDE_LABELS = Collections.singleton("client_id");

public static final int DEFAULT_PUSH_INTERVAL_MS = 5 * 60 * 1000;

private final ClientTelemetryProvider telemetryProvider;
Expand Down Expand Up @@ -226,7 +234,7 @@ public ClientTelemetrySender telemetrySender() {
private void initCollectors() {
kafkaMetricsCollector = new KafkaMetricsCollector(
TelemetryMetricNamingConvention.getClientTelemetryMetricNamingStrategy(
telemetryProvider.domain()));
telemetryProvider.domain()), EXCLUDE_LABELS);
}

private ResourceMetrics buildMetric(Metric metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class KafkaMetricsCollector implements MetricsCollector {
private final StateLedger ledger;
private final Time time;
private final MetricNamingStrategy<MetricName> metricNamingStrategy;
private final Set<String> excludeLabels;

private static final Field METRIC_VALUE_PROVIDER_FIELD;

Expand All @@ -141,15 +142,16 @@ public class KafkaMetricsCollector implements MetricsCollector {
}
}

public KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy) {
this(metricNamingStrategy, Time.SYSTEM);
public KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Set<String> excludeLabels) {
this(metricNamingStrategy, Time.SYSTEM, excludeLabels);
}

// Visible for testing
KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Time time) {
KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Time time, Set<String> excludeLabels) {
this.metricNamingStrategy = metricNamingStrategy;
this.time = time;
this.ledger = new StateLedger();
this.excludeLabels = excludeLabels;
}

public void init(List<KafkaMetric> metrics) {
Expand Down Expand Up @@ -241,11 +243,11 @@ private void collectSum(MetricKey metricKey, double value, MetricsEmitter metric

metricsEmitter.emitMetric(
SinglePointMetric.deltaSum(metricKey, instantAndValue.getValue(), true, timestamp,
instantAndValue.getIntervalStart())
instantAndValue.getIntervalStart(), excludeLabels)
);
} else {
metricsEmitter.emitMetric(
SinglePointMetric.sum(metricKey, value, true, timestamp, ledger.instantAdded(metricKey))
SinglePointMetric.sum(metricKey, value, true, timestamp, ledger.instantAdded(metricKey), excludeLabels)
);
}
}
Expand All @@ -256,7 +258,7 @@ private void collectGauge(MetricKey metricKey, Number value, MetricsEmitter metr
}

metricsEmitter.emitMetric(
SinglePointMetric.gauge(metricKey, value, timestamp)
SinglePointMetric.gauge(metricKey, value, timestamp, excludeLabels)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -52,48 +53,48 @@ public Metric.Builder builder() {
/*
Methods to construct gauge metric type.
*/
public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) {
public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp, Set<String> excludeLabels) {
NumberDataPoint.Builder point = point(timestamp, value);
return gauge(metricKey, point);
return gauge(metricKey, point, excludeLabels);
}

public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) {
public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp, Set<String> excludeLabels) {
NumberDataPoint.Builder point = point(timestamp, value);
return gauge(metricKey, point);
return gauge(metricKey, point, excludeLabels);
}

/*
Methods to construct sum metric type.
*/

public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) {
return sum(metricKey, value, monotonic, timestamp, null);
public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp, Set<String> excludeLabels) {
return sum(metricKey, value, monotonic, timestamp, null, excludeLabels);
}

public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp,
Instant startTimestamp) {
Instant startTimestamp, Set<String> excludeLabels) {
NumberDataPoint.Builder point = point(timestamp, value);
if (startTimestamp != null) {
point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
}

return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point, excludeLabels);
}

public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic,
Instant timestamp, Instant startTimestamp) {
Instant timestamp, Instant startTimestamp, Set<String> excludeLabels) {
NumberDataPoint.Builder point = point(timestamp, value)
.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));

return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point, excludeLabels);
}

/*
Helper methods to support metric construction.
*/
private static SinglePointMetric sum(MetricKey metricKey, AggregationTemporality aggregationTemporality,
boolean monotonic, NumberDataPoint.Builder point) {
point.addAllAttributes(asAttributes(metricKey.tags()));
boolean monotonic, NumberDataPoint.Builder point, Set<String> excludeLabels) {
point.addAllAttributes(asAttributes(metricKey.tags(), excludeLabels));

Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
metric
Expand All @@ -104,8 +105,8 @@ private static SinglePointMetric sum(MetricKey metricKey, AggregationTemporality
return new SinglePointMetric(metricKey, metric);
}

private static SinglePointMetric gauge(MetricKey metricKey, NumberDataPoint.Builder point) {
point.addAllAttributes(asAttributes(metricKey.tags()));
private static SinglePointMetric gauge(MetricKey metricKey, NumberDataPoint.Builder point, Set<String> excludeLabels) {
point.addAllAttributes(asAttributes(metricKey.tags(), excludeLabels));

Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
metric.getGaugeBuilder().addDataPoints(point);
Expand All @@ -132,8 +133,8 @@ private static NumberDataPoint.Builder point(Instant timestamp, double value) {
.setAsDouble(value);
}

private static Iterable<KeyValue> asAttributes(Map<String, String> labels) {
return labels.entrySet().stream().map(
private static Iterable<KeyValue> asAttributes(Map<String, String> labels, Set<String> excludeLabels) {
return labels.entrySet().stream().filter(entry -> !excludeLabels.contains(entry.getKey())).map(
entry -> KeyValue.newBuilder()
.setKey(entry.getKey())
.setValue(AnyValue.newBuilder().setStringValue(entry.getValue())).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ public void testEmitMetric() {
Collections.singletonList("name"));
ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(selector, true);

SinglePointMetric gauge = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now);
SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now);
SinglePointMetric gauge = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now, Collections.emptySet());
SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now, Collections.emptySet());
assertTrue(emitter.emitMetric(gauge));
assertTrue(emitter.emitMetric(sum));

MetricKey anotherKey = new MetricKey("io.name", Collections.emptyMap());
assertFalse(emitter.emitMetric(SinglePointMetric.gauge(anotherKey, Long.valueOf(1), now)));
assertFalse(emitter.emitMetric(SinglePointMetric.gauge(anotherKey, Long.valueOf(1), now, Collections.emptySet())));

assertEquals(2, emitter.emittedMetrics().size());
assertEquals(Arrays.asList(gauge, sum), emitter.emittedMetrics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void setUp() {
// Define collector to test.
collector = new KafkaMetricsCollector(
metricNamingStrategy,
time
time,
Collections.emptySet()
);

// Add reporter to metrics.
Expand Down Expand Up @@ -564,6 +566,70 @@ public void testCollectMetricsWithTemporalityChange() {
Instant.ofEpochSecond(301L).getNano(), point.getStartTimeUnixNano());
}

@Test
public void testCollectMetricsWithExcludeLabels() {
collector = new KafkaMetricsCollector(
metricNamingStrategy,
time,
Collections.singleton("tag2")
);

tags = new HashMap<>();
tags.put("tag1", "value1");
tags.put("tag2", "value2");

// Gauge metric.
MetricName name1 = metrics.metricName("nonMeasurable", "group1", tags);
metrics.addMetric(name1, (Gauge<Double>) (config, now) -> 99d);
// Sum metric.
MetricName name2 = metrics.metricName("counter", "group1", tags);
Sensor sensor = metrics.sensor("counter");
sensor.add(name2, new WindowedCount());
sensor.record();
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));

// Collect sum metrics
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
Metric metric = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(m -> m.getName().equals("test.domain.group1.nonmeasurable")).findFirst().get();

assertEquals(1, metric.getGauge().getDataPointsCount());
NumberDataPoint point = metric.getGauge().getDataPoints(0);
assertEquals(1, point.getAttributesCount());
assertEquals("tag1", point.getAttributes(0).getKey());
assertEquals("value1", point.getAttributes(0).getValue().getStringValue());

metric = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(m -> m.getName().equals("test.domain.group1.counter")).findFirst().get();

assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality());
assertEquals(1, metric.getSum().getDataPointsCount());
point = metric.getSum().getDataPoints(0);
assertEquals(1, point.getAttributesCount());
assertEquals("tag1", point.getAttributes(0).getKey());
assertEquals("value1", point.getAttributes(0).getValue().getStringValue());

testEmitter.reset();
testEmitter.onlyDeltaMetrics(true);
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();

// Delta metrics.
metric = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(m -> m.getName().equals("test.domain.group1.counter")).findFirst().get();

assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, metric.getSum().getAggregationTemporality());
assertEquals(1, metric.getSum().getDataPointsCount());
point = metric.getSum().getDataPoints(0);
assertEquals(1, point.getAttributesCount());
assertEquals("tag1", point.getAttributes(0).getKey());
assertEquals("value1", point.getAttributes(0).getValue().getStringValue());
}

private MetricsReporter getTestMetricsReporter() {
// Inline implementation of MetricsReporter for testing.
return new MetricsReporter() {
Expand Down
Loading

0 comments on commit 71b4cba

Please sign in to comment.