From 729d0030d7318e620cd57922c56a2abe3c58fcae Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 15 Aug 2023 06:33:32 +0000 Subject: [PATCH 1/3] Add Exemplars to metrics generated in aggregate processor Signed-off-by: Krishna Kondaka --- .../actions/CountAggregateAction.java | 18 ++++++++++ .../actions/HistogramAggregateAction.java | 33 +++++++++++++++++++ .../actions/CountAggregateActionTest.java | 3 ++ .../HistogramAggregateActionTests.java | 3 ++ 4 files changed, 57 insertions(+) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index 61a4ecce90..327d6101ee 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.Exemplar; +import org.opensearch.dataprepper.model.metric.DefaultExemplar; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; @@ -19,6 +21,7 @@ import io.opentelemetry.proto.metrics.v1.AggregationTemporality; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -42,12 +45,14 @@ public class CountAggregateAction implements AggregateAction { public final String startTimeKey; public final String outputFormat; private long startTimeNanos; + private List exemplarList; @DataPrepperPluginConstructor public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) { this.countKey = countAggregateActionConfig.getCountKey(); this.startTimeKey = countAggregateActionConfig.getStartTimeKey(); this.outputFormat = countAggregateActionConfig.getOutputFormat(); + this.exemplarList = new ArrayList<>(); } private long getTimeNanos(Instant time) { @@ -56,6 +61,17 @@ private long getTimeNanos(Instant time) { return currentTimeNanos; } + public Exemplar createExemplar(final Event event) { + long curTimeNanos = getTimeNanos(Instant.now()); + Map attributes = event.toMap(); + return new DefaultExemplar( + OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos), + 1.0, + event.get("spanId", String.class), // maybe null + event.get("traceId", String.class), // maybe null + attributes); + } + @Override public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { final GroupState groupState = aggregateActionInput.getGroupState(); @@ -63,6 +79,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct groupState.put(startTimeKey, Instant.now()); groupState.putAll(aggregateActionInput.getIdentificationKeys()); groupState.put(countKey, 1); + exemplarList.add(createExemplar(event)); } else { Integer v = (Integer)groupState.get(countKey) + 1; groupState.put(countKey, v); @@ -98,6 +115,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withUnit(SUM_METRIC_UNIT) .withAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA.name()) .withValue((double)countValue) + .withExemplars(exemplarList) .withAttributes(attr) .build(false); event = (Event)sum; diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index f535a10e7c..33cadcc483 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -7,6 +7,8 @@ import org.opensearch.dataprepper.model.metric.JacksonHistogram; import org.opensearch.dataprepper.model.metric.Bucket; +import org.opensearch.dataprepper.model.metric.Exemplar; +import org.opensearch.dataprepper.model.metric.DefaultExemplar; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; @@ -25,6 +27,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Map; +import java.util.Objects; import java.util.List; import java.util.HashMap; import java.util.Arrays; @@ -53,6 +56,11 @@ public class HistogramAggregateAction implements AggregateAction { private final String key; private final String units; private final boolean recordMinMax; + private List exemplarList; + private Event minEvent; + private Event maxEvent; + private double minValue; + private double maxValue; private long startTimeNanos; private double[] buckets; @@ -62,6 +70,7 @@ public HistogramAggregateAction(final HistogramAggregateActionConfig histogramAg this.key = histogramAggregateActionConfig.getKey(); List bucketList = histogramAggregateActionConfig.getBuckets(); this.buckets = new double[bucketList.size()+2]; + this.exemplarList = new ArrayList<>(); int bucketIdx = 0; this.buckets[bucketIdx++] = -Float.MAX_VALUE; for (int i = 0; i < bucketList.size(); i++) { @@ -101,6 +110,19 @@ private double convertToDouble(Number value) { return doubleValue; } + public Exemplar createExemplar(final String id, final Event event, double value) { + long curTimeNanos = getTimeNanos(Instant.now()); + Map attributes = event.toMap(); + if (Objects.nonNull(id)) { + attributes.put("exemplar_id", id); + } + return new DefaultExemplar(OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos), + value, + event.get("spanId", String.class), // maybe null + event.get("traceId", String.class), // maybe null + attributes); + } + @Override public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { final GroupState groupState = aggregateActionInput.getGroupState(); @@ -126,6 +148,10 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct if (this.recordMinMax) { groupState.put(minKey, doubleValue); groupState.put(maxKey, doubleValue); + minEvent = event; + maxEvent = event; + minValue = doubleValue; + maxValue = doubleValue; } } else { Integer v = (Integer)groupState.get(countKey) + 1; @@ -138,10 +164,14 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct double min = (double)groupState.get(minKey); if (doubleValue < min) { groupState.put(minKey, doubleValue); + minEvent = event; + minValue = doubleValue; } double max = (double)groupState.get(maxKey); if (doubleValue > max) { groupState.put(maxKey, doubleValue); + maxEvent = event; + maxValue = doubleValue; } } } @@ -159,6 +189,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA long startTimeNanos = getTimeNanos(startTime); long endTimeNanos = getTimeNanos(endTime); String histogramKey = HISTOGRAM_METRIC_NAME + "_key"; + exemplarList.add(createExemplar("min", minEvent, minValue)); + exemplarList.add(createExemplar("max", maxEvent, maxValue)); if (outputFormat.equals(OutputFormat.RAW.toString())) { groupState.put(histogramKey, key); groupState.put(durationKey, endTimeNanos-startTimeNanos); @@ -203,6 +235,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withBuckets(buckets) .withBucketCountsList(bucketCounts) .withExplicitBoundsList(explicitBoundsList) + .withExemplars(exemplarList) .withAttributes(attr) .build(false); event = (Event)histogram; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java index 9e35507a2a..0f96a83c41 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.Exemplar; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -108,5 +109,7 @@ void testCountAggregateOTelFormat(int testCount) { assertThat(metric.toJsonString().indexOf("attributes"), not(-1)); assertThat(result.get(0).toMap(), hasKey("startTime")); assertThat(result.get(0).toMap(), hasKey("time")); + List exemplars = (List )result.get(0).toMap().get("exemplars"); + assertThat(exemplars.size(), equalTo(1)); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java index 6e9d783998..b2b498306b 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.Exemplar; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -207,6 +208,8 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i))); } assertThat(((Map)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey)); + List exemplars = (List )result.get(0).toMap().get("exemplars"); + assertThat(exemplars.size(), equalTo(2)); assertThat(((Map)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue)); final String expectedDurationKey = histogramAggregateActionConfig.getDurationKey(); assertThat(((Map)result.get(0).toMap().get("attributes")), hasKey(expectedDurationKey)); From b4ad42e5b94ae7db8aadc7c8f8d434aa396c9d67 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 15 Aug 2023 06:47:08 +0000 Subject: [PATCH 2/3] Updated documentation Signed-off-by: Krishna Kondaka --- data-prepper-plugins/aggregate-processor/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/README.md b/data-prepper-plugins/aggregate-processor/README.md index a932c9a28d..2876aaa4dd 100644 --- a/data-prepper-plugins/aggregate-processor/README.md +++ b/data-prepper-plugins/aggregate-processor/README.md @@ -98,7 +98,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati The values in a list are merely appended, so there can be duplicates. ### -* `count`: Count Events belonging to the same group and generate a new event with values of the identification keys and the count, indicating the number of events. All Events that make up the combined Event will be dropped. +* `count`: Count Events belonging to the same group and generate a new event with values of the identification keys and the count, indicating the number of events. All Events that make up the combined Event will be dropped. One of the events is added as exemplar. If the aggregation is done on traces, then traceId and spanId are included in the exemplar, otherwise, spanId and traceId would be null. * It supports the following config options * `count_key`: key name to use for storing the count, default name is `aggr._count` * `start_time_key`: key name to use for storing the start time, default name is `aggr._start_time` @@ -114,7 +114,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati ``` The following Event will be created and processed by the rest of the pipeline when the group is concluded: ```json - {"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"} + {"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1", "exemplars":[{"time":"2022-12-02T19:29:51.245358486Z", "value": 1.0, "attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html"}, "spanId":null, "traceId":null}]} ``` If raw output format is used, the following Event will be created and processed by the rest of the pipeline when the group is concluded: ```json @@ -130,7 +130,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati ``` ### -* `histogram`: Aggreates events belonging to the same group and generate a new event with values of the identification keys and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. All events that make up the combined Event will be dropped. +* `histogram`: Aggreates events belonging to the same group and generate a new event with values of the identification keys and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. All events that make up the combined Event will be dropped. Events corresponding to min and max values are added as exemplars. If the aggregation is done on traces, then traceId and spanId are included in the exemplar, otherwise, spanId and traceId would be null. * It supports the following config options * `key`: name of the field in the events for which histogram needs to be generated * `generated_key_prefix`: key prefix to be used for all the fields created in the aggregated event. This allows the user to make sure that the names of the histogram event does not conflict with the field names in the event @@ -149,7 +149,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati ``` The following Event will be created and processed by the rest of the pipeline when the group is concluded: ```json - {"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"} + {"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency", "exemplars": [{"time":"2023-12-14T06:43:43.840684487Z","value":0.50,"attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html", "exemplar_id":"min"},"spanId":null,"traceId":null},{"time":"2022-12-14T06:43:43.844339030Z","value":0.55,"attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html","exemplar_id":"max"},"spanId":null,"traceId":null}]} ``` If raw output format is used, the following event will be created and processed by the rest of the pipeline when the group is concluded: ```json From a4968efacc943b9f421ea836fae6ca83ec65a513 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 16 Aug 2023 16:04:25 +0000 Subject: [PATCH 3/3] Addressed review comments Signed-off-by: Krishna Kondaka --- .../aggregate/actions/CountAggregateAction.java | 12 ++++++++++-- .../actions/HistogramAggregateAction.java | 14 +++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index 327d6101ee..382d1cc99b 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.trace.Span; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput; @@ -64,11 +65,18 @@ private long getTimeNanos(Instant time) { public Exemplar createExemplar(final Event event) { long curTimeNanos = getTimeNanos(Instant.now()); Map attributes = event.toMap(); + String spanId = null; + String traceId = null; + if (event instanceof Span) { + Span span = (Span)event; + spanId = span.getSpanId(); + traceId = span.getTraceId(); + } return new DefaultExemplar( OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos), 1.0, - event.get("spanId", String.class), // maybe null - event.get("traceId", String.class), // maybe null + spanId, + traceId, attributes); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index 33cadcc483..6db82130ea 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.metric.JacksonHistogram; import org.opensearch.dataprepper.model.metric.Bucket; import org.opensearch.dataprepper.model.metric.Exemplar; +import org.opensearch.dataprepper.model.trace.Span; import org.opensearch.dataprepper.model.metric.DefaultExemplar; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -114,12 +115,19 @@ public Exemplar createExemplar(final String id, final Event event, double value) long curTimeNanos = getTimeNanos(Instant.now()); Map attributes = event.toMap(); if (Objects.nonNull(id)) { - attributes.put("exemplar_id", id); + attributes.put("exemplarId", id); + } + String spanId = null; + String traceId = null; + if (event instanceof Span) { + Span span = (Span)event; + spanId = span.getSpanId(); + traceId = span.getTraceId(); } return new DefaultExemplar(OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos), value, - event.get("spanId", String.class), // maybe null - event.get("traceId", String.class), // maybe null + spanId, + traceId, attributes); }