Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Exemplars to metrics generated in aggregate processor #3165

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
The values in a list are merely appended, so there can be duplicates.

### <a name="count"></a>
* `count`: Count Events belonging to the same group and generate a new event with values of the identification keys and the count, indicating the number of events. All Events that make up the combined Event will be dropped.
* `count`: Count Events belonging to the same group and generate a new event with values of the identification keys and the count, indicating the number of events. All Events that make up the combined Event will be dropped. One of the events is added as exemplar. If the aggregation is done on traces, then traceId and spanId are included in the exemplar, otherwise, spanId and traceId would be null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GitHub appears to be reporting some sort of formatting error. Is the Markdown correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. When you view it in "RichText Format", it seems to be fine to me.

* It supports the following config options
* `count_key`: key name to use for storing the count, default name is `aggr._count`
* `start_time_key`: key name to use for storing the start time, default name is `aggr._start_time`
Expand All @@ -114,7 +114,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```
The following Event will be created and processed by the rest of the pipeline when the group is concluded:
```json
{"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"}
{"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1", "exemplars":[{"time":"2022-12-02T19:29:51.245358486Z", "value": 1.0, "attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html"}, "spanId":null, "traceId":null}]}
```
If raw output format is used, the following Event will be created and processed by the rest of the pipeline when the group is concluded:
```json
Expand All @@ -130,7 +130,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```

### <a name="histogram"></a>
* `histogram`: Aggreates events belonging to the same group and generate a new event with values of the identification keys and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. All events that make up the combined Event will be dropped.
* `histogram`: Aggreates events belonging to the same group and generate a new event with values of the identification keys and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. All events that make up the combined Event will be dropped. Events corresponding to min and max values are added as exemplars. If the aggregation is done on traces, then traceId and spanId are included in the exemplar, otherwise, spanId and traceId would be null.
* It supports the following config options
* `key`: name of the field in the events for which histogram needs to be generated
* `generated_key_prefix`: key prefix to be used for all the fields created in the aggregated event. This allows the user to make sure that the names of the histogram event does not conflict with the field names in the event
Expand All @@ -149,7 +149,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```
The following Event will be created and processed by the rest of the pipeline when the group is concluded:
```json
{"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"}
{"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency", "exemplars": [{"time":"2023-12-14T06:43:43.840684487Z","value":0.50,"attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html", "exemplar_id":"min"},"spanId":null,"traceId":null},{"time":"2022-12-14T06:43:43.844339030Z","value":0.55,"attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html","exemplar_id":"max"},"spanId":null,"traceId":null}]}
```
If raw output format is used, the following event will be created and processed by the rest of the pipeline when the group is concluded:
```json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import org.opensearch.dataprepper.model.metric.JacksonSum;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
Expand All @@ -19,6 +21,7 @@
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
Expand All @@ -42,12 +45,14 @@ public class CountAggregateAction implements AggregateAction {
public final String startTimeKey;
public final String outputFormat;
private long startTimeNanos;
private List<Exemplar> exemplarList;

@DataPrepperPluginConstructor
public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) {
this.countKey = countAggregateActionConfig.getCountKey();
this.startTimeKey = countAggregateActionConfig.getStartTimeKey();
this.outputFormat = countAggregateActionConfig.getOutputFormat();
this.exemplarList = new ArrayList<>();
}

private long getTimeNanos(Instant time) {
Expand All @@ -56,13 +61,25 @@ private long getTimeNanos(Instant time) {
return currentTimeNanos;
}

public Exemplar createExemplar(final Event event) {
long curTimeNanos = getTimeNanos(Instant.now());
Map<String, Object> attributes = event.toMap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is exactly what the attributes is meant to convey. In the model these are filtered_attributes.

And the current code in Data Prepper for metric exemplars uses only the provided filtered_attributes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I do not know why SAP folks called it Attributes in the DataPrepper implementation

return new DefaultExemplar(
OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
1.0,
event.get("spanId", String.class), // maybe null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value should only be found when this is a SpanEvent. It may be better to check if this is a span and then handle as necessary.

e.g.

if(event instanceof SpanEvent) {
  SpanEvent spanEvent = (SpanEvent) event;
  String spanId = spanEvent.getSpanId();
  String traceId = spanEvent.getTraceId();
}

return new DefaultExemplar(
  OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
  1.0,
  spanId,
  traceId,
  attributes);

event.get("traceId", String.class), // maybe null
attributes);
}

@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
if (groupState.get(countKey) == null) {
groupState.put(startTimeKey, Instant.now());
groupState.putAll(aggregateActionInput.getIdentificationKeys());
groupState.put(countKey, 1);
exemplarList.add(createExemplar(event));
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
groupState.put(countKey, v);
Expand Down Expand Up @@ -98,6 +115,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withUnit(SUM_METRIC_UNIT)
.withAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA.name())
.withValue((double)countValue)
.withExemplars(exemplarList)
.withAttributes(attr)
.build(false);
event = (Event)sum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.opensearch.dataprepper.model.metric.JacksonHistogram;
import org.opensearch.dataprepper.model.metric.Bucket;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
Expand All @@ -25,6 +27,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.List;
import java.util.HashMap;
import java.util.Arrays;
Expand Down Expand Up @@ -53,6 +56,11 @@ public class HistogramAggregateAction implements AggregateAction {
private final String key;
private final String units;
private final boolean recordMinMax;
private List<Exemplar> exemplarList;
private Event minEvent;
private Event maxEvent;
private double minValue;
private double maxValue;

private long startTimeNanos;
private double[] buckets;
Expand All @@ -62,6 +70,7 @@ public HistogramAggregateAction(final HistogramAggregateActionConfig histogramAg
this.key = histogramAggregateActionConfig.getKey();
List<Number> bucketList = histogramAggregateActionConfig.getBuckets();
this.buckets = new double[bucketList.size()+2];
this.exemplarList = new ArrayList<>();
int bucketIdx = 0;
this.buckets[bucketIdx++] = -Float.MAX_VALUE;
for (int i = 0; i < bucketList.size(); i++) {
Expand Down Expand Up @@ -101,6 +110,19 @@ private double convertToDouble(Number value) {
return doubleValue;
}

public Exemplar createExemplar(final String id, final Event event, double value) {
long curTimeNanos = getTimeNanos(Instant.now());
Map<String, Object> attributes = event.toMap();
if (Objects.nonNull(id)) {
attributes.put("exemplar_id", id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right naming convention? Does this work with the OpenSearch schemas?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to exemplarId

}
return new DefaultExemplar(OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
value,
event.get("spanId", String.class), // maybe null
event.get("traceId", String.class), // maybe null
attributes);
}

@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
Expand All @@ -126,6 +148,10 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
if (this.recordMinMax) {
groupState.put(minKey, doubleValue);
groupState.put(maxKey, doubleValue);
minEvent = event;
maxEvent = event;
minValue = doubleValue;
maxValue = doubleValue;
}
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
Expand All @@ -138,10 +164,14 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
double min = (double)groupState.get(minKey);
if (doubleValue < min) {
groupState.put(minKey, doubleValue);
minEvent = event;
minValue = doubleValue;
}
double max = (double)groupState.get(maxKey);
if (doubleValue > max) {
groupState.put(maxKey, doubleValue);
maxEvent = event;
maxValue = doubleValue;
}
}
}
Expand All @@ -159,6 +189,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
long startTimeNanos = getTimeNanos(startTime);
long endTimeNanos = getTimeNanos(endTime);
String histogramKey = HISTOGRAM_METRIC_NAME + "_key";
exemplarList.add(createExemplar("min", minEvent, minValue));
exemplarList.add(createExemplar("max", maxEvent, maxValue));
if (outputFormat.equals(OutputFormat.RAW.toString())) {
groupState.put(histogramKey, key);
groupState.put(durationKey, endTimeNanos-startTimeNanos);
Expand Down Expand Up @@ -203,6 +235,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withBuckets(buckets)
.withBucketCountsList(bucketCounts)
.withExplicitBoundsList(explicitBoundsList)
.withExemplars(exemplarList)
.withAttributes(attr)
.build(false);
event = (Event)histogram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -108,5 +109,7 @@ void testCountAggregateOTelFormat(int testCount) {
assertThat(metric.toJsonString().indexOf("attributes"), not(-1));
assertThat(result.get(0).toMap(), hasKey("startTime"));
assertThat(result.get(0).toMap(), hasKey("time"));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -207,6 +208,8 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException
assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i)));
}
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(2));
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue));
final String expectedDurationKey = histogramAggregateActionConfig.getDurationKey();
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasKey(expectedDurationKey));
Expand Down
Loading