From 729d0030d7318e620cd57922c56a2abe3c58fcae Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 15 Aug 2023 06:33:32 +0000 Subject: [PATCH] Add Exemplars to metrics generated in aggregate processor Signed-off-by: Krishna Kondaka --- .../actions/CountAggregateAction.java | 18 ++++++++++ .../actions/HistogramAggregateAction.java | 33 +++++++++++++++++++ .../actions/CountAggregateActionTest.java | 3 ++ .../HistogramAggregateActionTests.java | 3 ++ 4 files changed, 57 insertions(+) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index 61a4ecce90..327d6101ee 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.Exemplar; +import org.opensearch.dataprepper.model.metric.DefaultExemplar; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; @@ -19,6 +21,7 @@ import io.opentelemetry.proto.metrics.v1.AggregationTemporality; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -42,12 +45,14 @@ public class CountAggregateAction implements AggregateAction { public final String startTimeKey; public final String outputFormat; private long startTimeNanos; + private List exemplarList; @DataPrepperPluginConstructor public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) { this.countKey = countAggregateActionConfig.getCountKey(); this.startTimeKey = countAggregateActionConfig.getStartTimeKey(); this.outputFormat = countAggregateActionConfig.getOutputFormat(); + this.exemplarList = new ArrayList<>(); } private long getTimeNanos(Instant time) { @@ -56,6 +61,17 @@ private long getTimeNanos(Instant time) { return currentTimeNanos; } + public Exemplar createExemplar(final Event event) { + long curTimeNanos = getTimeNanos(Instant.now()); + Map attributes = event.toMap(); + return new DefaultExemplar( + OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos), + 1.0, + event.get("spanId", String.class), // maybe null + event.get("traceId", String.class), // maybe null + attributes); + } + @Override public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { final GroupState groupState = aggregateActionInput.getGroupState(); @@ -63,6 +79,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct groupState.put(startTimeKey, Instant.now()); groupState.putAll(aggregateActionInput.getIdentificationKeys()); groupState.put(countKey, 1); + exemplarList.add(createExemplar(event)); } else { Integer v = (Integer)groupState.get(countKey) + 1; groupState.put(countKey, v); @@ -98,6 +115,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withUnit(SUM_METRIC_UNIT) .withAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA.name()) .withValue((double)countValue) + .withExemplars(exemplarList) .withAttributes(attr) .build(false); event = (Event)sum; diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index f535a10e7c..33cadcc483 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -7,6 +7,8 @@ import org.opensearch.dataprepper.model.metric.JacksonHistogram; import org.opensearch.dataprepper.model.metric.Bucket; +import org.opensearch.dataprepper.model.metric.Exemplar; +import org.opensearch.dataprepper.model.metric.DefaultExemplar; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; @@ -25,6 +27,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Map; +import java.util.Objects; import java.util.List; import java.util.HashMap; import java.util.Arrays; @@ -53,6 +56,11 @@ public class HistogramAggregateAction implements AggregateAction { private final String key; private final String units; private final boolean recordMinMax; + private List exemplarList; + private Event minEvent; + private Event maxEvent; + private double minValue; + private double maxValue; private long startTimeNanos; private double[] buckets; @@ -62,6 +70,7 @@ public HistogramAggregateAction(final HistogramAggregateActionConfig histogramAg this.key = histogramAggregateActionConfig.getKey(); List bucketList = histogramAggregateActionConfig.getBuckets(); this.buckets = new double[bucketList.size()+2]; + this.exemplarList = new ArrayList<>(); int bucketIdx = 0; this.buckets[bucketIdx++] = -Float.MAX_VALUE; for (int i = 0; i < bucketList.size(); i++) { @@ -101,6 +110,19 @@ private double convertToDouble(Number value) { return doubleValue; } + public Exemplar createExemplar(final String id, final Event event, double value) { + long curTimeNanos = getTimeNanos(Instant.now()); + Map attributes = event.toMap(); + if (Objects.nonNull(id)) { + attributes.put("exemplar_id", id); + } + return new DefaultExemplar(OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos), + value, + event.get("spanId", String.class), // maybe null + event.get("traceId", String.class), // maybe null + attributes); + } + @Override public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { final GroupState groupState = aggregateActionInput.getGroupState(); @@ -126,6 +148,10 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct if (this.recordMinMax) { groupState.put(minKey, doubleValue); groupState.put(maxKey, doubleValue); + minEvent = event; + maxEvent = event; + minValue = doubleValue; + maxValue = doubleValue; } } else { Integer v = (Integer)groupState.get(countKey) + 1; @@ -138,10 +164,14 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct double min = (double)groupState.get(minKey); if (doubleValue < min) { groupState.put(minKey, doubleValue); + minEvent = event; + minValue = doubleValue; } double max = (double)groupState.get(maxKey); if (doubleValue > max) { groupState.put(maxKey, doubleValue); + maxEvent = event; + maxValue = doubleValue; } } } @@ -159,6 +189,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA long startTimeNanos = getTimeNanos(startTime); long endTimeNanos = getTimeNanos(endTime); String histogramKey = HISTOGRAM_METRIC_NAME + "_key"; + exemplarList.add(createExemplar("min", minEvent, minValue)); + exemplarList.add(createExemplar("max", maxEvent, maxValue)); if (outputFormat.equals(OutputFormat.RAW.toString())) { groupState.put(histogramKey, key); groupState.put(durationKey, endTimeNanos-startTimeNanos); @@ -203,6 +235,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withBuckets(buckets) .withBucketCountsList(bucketCounts) .withExplicitBoundsList(explicitBoundsList) + .withExemplars(exemplarList) .withAttributes(attr) .build(false); event = (Event)histogram; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java index 9e35507a2a..0f96a83c41 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.Exemplar; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -108,5 +109,7 @@ void testCountAggregateOTelFormat(int testCount) { assertThat(metric.toJsonString().indexOf("attributes"), not(-1)); assertThat(result.get(0).toMap(), hasKey("startTime")); assertThat(result.get(0).toMap(), hasKey("time")); + List exemplars = (List )result.get(0).toMap().get("exemplars"); + assertThat(exemplars.size(), equalTo(1)); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java index 6e9d783998..b2b498306b 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.Exemplar; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -207,6 +208,8 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i))); } assertThat(((Map)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey)); + List exemplars = (List )result.get(0).toMap().get("exemplars"); + assertThat(exemplars.size(), equalTo(2)); assertThat(((Map)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue)); final String expectedDurationKey = histogramAggregateActionConfig.getDurationKey(); assertThat(((Map)result.get(0).toMap().get("attributes")), hasKey(expectedDurationKey));