Skip to content

Commit

Permalink
Add Exemplars to metrics generated in aggregate processor
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 15, 2023
1 parent 42e274d commit 729d003
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import org.opensearch.dataprepper.model.metric.JacksonSum;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
Expand All @@ -19,6 +21,7 @@
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
Expand All @@ -42,12 +45,14 @@ public class CountAggregateAction implements AggregateAction {
public final String startTimeKey;
public final String outputFormat;
private long startTimeNanos;
private List<Exemplar> exemplarList;

@DataPrepperPluginConstructor
public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) {
this.countKey = countAggregateActionConfig.getCountKey();
this.startTimeKey = countAggregateActionConfig.getStartTimeKey();
this.outputFormat = countAggregateActionConfig.getOutputFormat();
this.exemplarList = new ArrayList<>();
}

private long getTimeNanos(Instant time) {
Expand All @@ -56,13 +61,25 @@ private long getTimeNanos(Instant time) {
return currentTimeNanos;
}

public Exemplar createExemplar(final Event event) {
long curTimeNanos = getTimeNanos(Instant.now());
Map<String, Object> attributes = event.toMap();
return new DefaultExemplar(
OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
1.0,
event.get("spanId", String.class), // maybe null
event.get("traceId", String.class), // maybe null
attributes);
}

@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
if (groupState.get(countKey) == null) {
groupState.put(startTimeKey, Instant.now());
groupState.putAll(aggregateActionInput.getIdentificationKeys());
groupState.put(countKey, 1);
exemplarList.add(createExemplar(event));
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
groupState.put(countKey, v);
Expand Down Expand Up @@ -98,6 +115,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withUnit(SUM_METRIC_UNIT)
.withAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA.name())
.withValue((double)countValue)
.withExemplars(exemplarList)
.withAttributes(attr)
.build(false);
event = (Event)sum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.opensearch.dataprepper.model.metric.JacksonHistogram;
import org.opensearch.dataprepper.model.metric.Bucket;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
Expand All @@ -25,6 +27,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.List;
import java.util.HashMap;
import java.util.Arrays;
Expand Down Expand Up @@ -53,6 +56,11 @@ public class HistogramAggregateAction implements AggregateAction {
private final String key;
private final String units;
private final boolean recordMinMax;
private List<Exemplar> exemplarList;
private Event minEvent;
private Event maxEvent;
private double minValue;
private double maxValue;

private long startTimeNanos;
private double[] buckets;
Expand All @@ -62,6 +70,7 @@ public HistogramAggregateAction(final HistogramAggregateActionConfig histogramAg
this.key = histogramAggregateActionConfig.getKey();
List<Number> bucketList = histogramAggregateActionConfig.getBuckets();
this.buckets = new double[bucketList.size()+2];
this.exemplarList = new ArrayList<>();
int bucketIdx = 0;
this.buckets[bucketIdx++] = -Float.MAX_VALUE;
for (int i = 0; i < bucketList.size(); i++) {
Expand Down Expand Up @@ -101,6 +110,19 @@ private double convertToDouble(Number value) {
return doubleValue;
}

public Exemplar createExemplar(final String id, final Event event, double value) {
long curTimeNanos = getTimeNanos(Instant.now());
Map<String, Object> attributes = event.toMap();
if (Objects.nonNull(id)) {
attributes.put("exemplar_id", id);
}
return new DefaultExemplar(OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
value,
event.get("spanId", String.class), // maybe null
event.get("traceId", String.class), // maybe null
attributes);
}

@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
Expand All @@ -126,6 +148,10 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
if (this.recordMinMax) {
groupState.put(minKey, doubleValue);
groupState.put(maxKey, doubleValue);
minEvent = event;
maxEvent = event;
minValue = doubleValue;
maxValue = doubleValue;
}
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
Expand All @@ -138,10 +164,14 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
double min = (double)groupState.get(minKey);
if (doubleValue < min) {
groupState.put(minKey, doubleValue);
minEvent = event;
minValue = doubleValue;
}
double max = (double)groupState.get(maxKey);
if (doubleValue > max) {
groupState.put(maxKey, doubleValue);
maxEvent = event;
maxValue = doubleValue;
}
}
}
Expand All @@ -159,6 +189,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
long startTimeNanos = getTimeNanos(startTime);
long endTimeNanos = getTimeNanos(endTime);
String histogramKey = HISTOGRAM_METRIC_NAME + "_key";
exemplarList.add(createExemplar("min", minEvent, minValue));
exemplarList.add(createExemplar("max", maxEvent, maxValue));
if (outputFormat.equals(OutputFormat.RAW.toString())) {
groupState.put(histogramKey, key);
groupState.put(durationKey, endTimeNanos-startTimeNanos);
Expand Down Expand Up @@ -203,6 +235,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withBuckets(buckets)
.withBucketCountsList(bucketCounts)
.withExplicitBoundsList(explicitBoundsList)
.withExemplars(exemplarList)
.withAttributes(attr)
.build(false);
event = (Event)histogram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -108,5 +109,7 @@ void testCountAggregateOTelFormat(int testCount) {
assertThat(metric.toJsonString().indexOf("attributes"), not(-1));
assertThat(result.get(0).toMap(), hasKey("startTime"));
assertThat(result.get(0).toMap(), hasKey("time"));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -207,6 +208,8 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException
assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i)));
}
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(2));
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue));
final String expectedDurationKey = histogramAggregateActionConfig.getDurationKey();
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasKey(expectedDurationKey));
Expand Down

0 comments on commit 729d003

Please sign in to comment.