From d47dc561f3b36548bca057cca99ba340ee65e23c Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Wed, 4 Dec 2024 02:18:52 -0800 Subject: [PATCH] Support FunnelEventsFunctionEvalAggregationFunction and FunnelStepDurationStatsAggregationFunction functions --- .../function/scalar/ArrayFunctions.java | 25 + .../pinot/core/common/ObjectSerDeUtils.java | 59 ++ .../common/datablock/DataBlockBuilder.java | 28 +- .../docvalsets/RowBasedBlockValSet.java | 124 +++++ .../function/AggregationFunctionFactory.java | 6 + .../FunnelStepEventWithExtraFields.java | 104 ++++ .../window/FunnelBaseAggregationFunction.java | 5 +- ...EventsFunctionEvalAggregationFunction.java | 502 ++++++++++++++++++ ...lStepDurationStatsAggregationFunction.java | 261 +++++++++ .../tests/custom/WindowFunnelTest.java | 151 ++++++ .../segment/spi/AggregationFunctionType.java | 5 + 11 files changed, 1265 insertions(+), 5 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java index 5a165f12a7ec..f5abf48d2f19 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java @@ -365,4 +365,29 @@ public static String arrayToString(String[] values, String delimiter, String nul .map(s -> s == null || s.equals(NullValuePlaceHolder.STRING) ? nullString : s) .toArray(String[]::new)); } + + @ScalarFunction + public static int arrayLengthInt(int[] values) { + return values.length; + } + + @ScalarFunction + public static int arrayLengthLong(long[] values) { + return values.length; + } + + @ScalarFunction + public static int arrayLengthFloat(float[] values) { + return values.length; + } + + @ScalarFunction + public static int arrayLengthDouble(double[] values) { + return values.length; + } + + @ScalarFunction + public static int arrayLengthString(String[] values) { + return values.length; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 379c697f76ab..8650ae55bf63 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -77,6 +77,7 @@ import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWithExtraFields; import org.apache.pinot.core.query.aggregation.utils.exprminmax.ExprMinMaxObject; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.query.utils.idset.IdSets; @@ -1747,6 +1748,63 @@ public PriorityQueue deserialize(ByteBuffer byteBuffer) { } }; + public static final ObjectSerDe> + FUNNEL_STEP_EVENT_WITH_EXTRA_FIELDS_ACCUMULATOR_SER_DE = + new ObjectSerDe>() { + + @Override + public byte[] serialize(PriorityQueue funnelStepEvents) { + int numEvents = funnelStepEvents.size(); + List serializedEvents = new ArrayList<>(numEvents); + long bufferSize = Integer.BYTES; // Start with size for number of events + + // First pass: Serialize each event and calculate total buffer size + for (FunnelStepEventWithExtraFields funnelStepEvent : funnelStepEvents) { + byte[] eventBytes = funnelStepEvent.getBytes(); // Costly operation, compute only once + serializedEvents.add(eventBytes); // Store serialized form + bufferSize += Integer.BYTES; // Add size for storing length + bufferSize += eventBytes.length; // Add size of serialized content + } + + // Ensure the total buffer size doesn't exceed 2GB + Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB"); + + // Allocate buffer + byte[] bytes = new byte[(int) bufferSize]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + + // Second pass: Write data to the buffer + byteBuffer.putInt(numEvents); // Write number of events + for (byte[] eventBytes : serializedEvents) { + byteBuffer.putInt(eventBytes.length); // Write length of each event + byteBuffer.put(eventBytes); // Write event content + } + + return bytes; + } + + @Override + public PriorityQueue deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + @Override + public PriorityQueue deserialize(ByteBuffer byteBuffer) { + int size = byteBuffer.getInt(); + if (size == 0) { + return new PriorityQueue<>(); + } + PriorityQueue funnelStepEvents = new PriorityQueue<>(size); + for (int i = 0; i < size; i++) { + int funnelStepEventWithExtraFieldsByteSize = byteBuffer.getInt(); + byte[] bytes = new byte[funnelStepEventWithExtraFieldsByteSize]; + byteBuffer.get(bytes); + funnelStepEvents.add(new FunnelStepEventWithExtraFields(bytes)); + } + return funnelStepEvents; + } + }; + // NOTE: DO NOT change the order, it has to be the same order as the ObjectType //@formatter:off private static final ObjectSerDe[] SER_DES = { @@ -1802,6 +1860,7 @@ public PriorityQueue deserialize(ByteBuffer byteBuffer) { DATA_SKETCH_CPC_ACCUMULATOR_SER_DE, ORDERED_STRING_SET_SER_DE, FUNNEL_STEP_EVENT_ACCUMULATOR_SER_DE, + FUNNEL_STEP_EVENT_WITH_EXTRA_FIELDS_ACCUMULATOR_SER_DE, }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java index c795f5af2544..0c746328e200 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java @@ -19,6 +19,10 @@ package org.apache.pinot.core.common.datablock; import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.floats.FloatArrayList; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import java.io.IOException; @@ -121,16 +125,32 @@ public static RowDataBlock buildFromRows(List rows, DataSchema dataSch break; // Multi-value column case INT_ARRAY: - setColumn(fixedSize, varSize, (int[]) value); + if (value instanceof IntArrayList) { + setColumn(fixedSize, varSize, ((IntArrayList) value).elements()); + } else { + setColumn(fixedSize, varSize, (int[]) value); + } break; case LONG_ARRAY: - setColumn(fixedSize, varSize, (long[]) value); + if (value instanceof LongArrayList) { + setColumn(fixedSize, varSize, ((LongArrayList) value).elements()); + } else { + setColumn(fixedSize, varSize, (long[]) value); + } break; case FLOAT_ARRAY: - setColumn(fixedSize, varSize, (float[]) value); + if (value instanceof FloatArrayList) { + setColumn(fixedSize, varSize, ((FloatArrayList) value).elements()); + } else { + setColumn(fixedSize, varSize, (float[]) value); + } break; case DOUBLE_ARRAY: - setColumn(fixedSize, varSize, (double[]) value); + if (value instanceof DoubleArrayList) { + setColumn(fixedSize, varSize, ((DoubleArrayList) value).elements()); + } else { + setColumn(fixedSize, varSize, (double[]) value); + } break; case STRING_ARRAY: setColumn(fixedSize, varSize, (String[]) value, dictionary); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java index c18cd3acb486..400b9f5d2b5d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.core.operator.docvalsets; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; @@ -435,6 +439,30 @@ public int[][] getIntValuesMV() { for (int j = 0; j < stringArray.length; j++) { values[i][j] = Integer.parseInt(stringArray[j]); } + } else if (storedValue instanceof IntArrayList) { + IntArrayList intArrayList = (IntArrayList) storedValue; + values[i] = new int[intArrayList.size()]; + for (int j = 0; j < intArrayList.size(); j++) { + values[i][j] = intArrayList.getInt(j); + } + } else if (storedValue instanceof LongArrayList) { + LongArrayList longArrayList = (LongArrayList) storedValue; + values[i] = new int[longArrayList.size()]; + for (int j = 0; j < longArrayList.size(); j++) { + values[i][j] = (int) longArrayList.getLong(j); + } + } else if (storedValue instanceof DoubleArrayList) { + DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue; + values[i] = new int[doubleArrayList.size()]; + for (int j = 0; j < doubleArrayList.size(); j++) { + values[i][j] = (int) doubleArrayList.getDouble(j); + } + } else if (storedValue instanceof ObjectArrayList) { + ObjectArrayList list = (ObjectArrayList) storedValue; + values[i] = new int[list.size()]; + for (int j = 0; j < list.size(); j++) { + values[i][j] = Integer.parseInt(list.get(j).toString()); + } } else { throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName()); } @@ -475,6 +503,30 @@ public long[][] getLongValuesMV() { for (int j = 0; j < stringArray.length; j++) { values[i][j] = Long.parseLong(stringArray[j]); } + } else if (storedValue instanceof IntArrayList) { + IntArrayList intArrayList = (IntArrayList) storedValue; + values[i] = new long[intArrayList.size()]; + for (int j = 0; j < intArrayList.size(); j++) { + values[i][j] = intArrayList.getInt(j); + } + } else if (storedValue instanceof LongArrayList) { + LongArrayList longArrayList = (LongArrayList) storedValue; + values[i] = new long[longArrayList.size()]; + for (int j = 0; j < longArrayList.size(); j++) { + values[i][j] = longArrayList.getLong(j); + } + } else if (storedValue instanceof DoubleArrayList) { + DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue; + values[i] = new long[doubleArrayList.size()]; + for (int j = 0; j < doubleArrayList.size(); j++) { + values[i][j] = (long) doubleArrayList.getDouble(j); + } + } else if (storedValue instanceof ObjectArrayList) { + ObjectArrayList list = (ObjectArrayList) storedValue; + values[i] = new long[list.size()]; + for (int j = 0; j < list.size(); j++) { + values[i][j] = Long.parseLong(list.get(j).toString()); + } } else { throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName()); } @@ -515,6 +567,30 @@ public float[][] getFloatValuesMV() { for (int j = 0; j < stringArray.length; j++) { values[i][j] = Float.parseFloat(stringArray[j]); } + } else if (storedValue instanceof IntArrayList) { + IntArrayList intArrayList = (IntArrayList) storedValue; + values[i] = new float[intArrayList.size()]; + for (int j = 0; j < intArrayList.size(); j++) { + values[i][j] = intArrayList.getInt(j); + } + } else if (storedValue instanceof LongArrayList) { + LongArrayList longArrayList = (LongArrayList) storedValue; + values[i] = new float[longArrayList.size()]; + for (int j = 0; j < longArrayList.size(); j++) { + values[i][j] = longArrayList.getLong(j); + } + } else if (storedValue instanceof DoubleArrayList) { + DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue; + values[i] = new float[doubleArrayList.size()]; + for (int j = 0; j < doubleArrayList.size(); j++) { + values[i][j] = (float) doubleArrayList.getDouble(j); + } + } else if (storedValue instanceof ObjectArrayList) { + ObjectArrayList list = (ObjectArrayList) storedValue; + values[i] = new float[list.size()]; + for (int j = 0; j < list.size(); j++) { + values[i][j] = Float.parseFloat(list.get(j).toString()); + } } else { throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName()); } @@ -555,6 +631,30 @@ public double[][] getDoubleValuesMV() { for (int j = 0; j < stringArray.length; j++) { values[i][j] = Double.parseDouble(stringArray[j]); } + } else if (storedValue instanceof IntArrayList) { + IntArrayList intArrayList = (IntArrayList) storedValue; + values[i] = new double[intArrayList.size()]; + for (int j = 0; j < intArrayList.size(); j++) { + values[i][j] = intArrayList.getInt(j); + } + } else if (storedValue instanceof LongArrayList) { + LongArrayList longArrayList = (LongArrayList) storedValue; + values[i] = new double[longArrayList.size()]; + for (int j = 0; j < longArrayList.size(); j++) { + values[i][j] = longArrayList.getLong(j); + } + } else if (storedValue instanceof DoubleArrayList) { + DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue; + values[i] = new double[doubleArrayList.size()]; + for (int j = 0; j < doubleArrayList.size(); j++) { + values[i][j] = doubleArrayList.getDouble(j); + } + } else if (storedValue instanceof ObjectArrayList) { + ObjectArrayList list = (ObjectArrayList) storedValue; + values[i] = new double[list.size()]; + for (int j = 0; j < list.size(); j++) { + values[i][j] = Double.parseDouble(list.get(j).toString()); + } } else { throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName()); } @@ -601,6 +701,30 @@ public String[][] getStringValuesMV() { } } else if (storedValue instanceof String[]) { values[i] = (String[]) storedValue; + } else if (storedValue instanceof IntArrayList) { + IntArrayList intArrayList = (IntArrayList) storedValue; + values[i] = new String[intArrayList.size()]; + for (int j = 0; j < intArrayList.size(); j++) { + values[i][j] = Integer.toString(intArrayList.getInt(j)); + } + } else if (storedValue instanceof LongArrayList) { + LongArrayList longArrayList = (LongArrayList) storedValue; + values[i] = new String[longArrayList.size()]; + for (int j = 0; j < longArrayList.size(); j++) { + values[i][j] = Long.toString(longArrayList.getLong(j)); + } + } else if (storedValue instanceof DoubleArrayList) { + DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue; + values[i] = new String[doubleArrayList.size()]; + for (int j = 0; j < doubleArrayList.size(); j++) { + values[i][j] = Double.toString(doubleArrayList.getDouble(j)); + } + } else if (storedValue instanceof ObjectArrayList) { + ObjectArrayList list = (ObjectArrayList) storedValue; + values[i] = new String[list.size()]; + for (int j = 0; j < list.size(); j++) { + values[i][j] = list.get(j).toString(); + } } else { throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index e6ddb994d2fa..205f1ae71abf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -39,8 +39,10 @@ import org.apache.pinot.core.query.aggregation.function.array.SumArrayLongAggregationFunction; import org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory; import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelCompleteCountAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelEventsFunctionEvalAggregationFunction; import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMatchStepAggregationFunction; import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMaxStepAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelStepDurationStatsAggregationFunction; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.exception.BadQueryRequestException; @@ -467,6 +469,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio return new FunnelMatchStepAggregationFunction(arguments); case FUNNELCOMPLETECOUNT: return new FunnelCompleteCountAggregationFunction(arguments); + case FUNNELSTEPDURATIONSTATS: + return new FunnelStepDurationStatsAggregationFunction(arguments); + case FUNNELEVENTSFUNCTIONEVAL: + return new FunnelEventsFunctionEvalAggregationFunction(arguments); case FREQUENTSTRINGSSKETCH: return new FrequentStringsSketchAggregationFunction(arguments); case FREQUENTLONGSSKETCH: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java new file mode 100644 index 000000000000..57eb8d17ee6d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.funnel; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + + +public class FunnelStepEventWithExtraFields implements Comparable { + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final FunnelStepEvent _funnelStepEvent; + private final List _extraFields; + + public FunnelStepEventWithExtraFields(FunnelStepEvent funnelStepEvent, List extraFields) { + _funnelStepEvent = funnelStepEvent; + _extraFields = extraFields; + } + + public FunnelStepEventWithExtraFields(byte[] bytes) { + _funnelStepEvent = new FunnelStepEvent(Arrays.copyOf(bytes, FunnelStepEvent.SIZE_IN_BYTES)); + try { + _extraFields = OBJECT_MAPPER.readValue(bytes, 2, bytes.length, new TypeReference>() { + }); + } catch (IOException e) { + throw new RuntimeException("Caught exception while converting byte[] to FunnelStepEventWithExtraFields", e); + } + } + + public FunnelStepEvent getFunnelStepEvent() { + return _funnelStepEvent; + } + + public List getExtraFields() { + return _extraFields; + } + + @Override + public String toString() { + return "StepEventWithExtraFields{" + "funnelStepEvent=" + _funnelStepEvent + ", extraFields=" + _extraFields + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FunnelStepEventWithExtraFields stepEvent = (FunnelStepEventWithExtraFields) o; + + if (!_funnelStepEvent.equals(stepEvent.getFunnelStepEvent())) { + return false; + } + return _extraFields.equals(stepEvent.getExtraFields()); + } + + @Override + public int hashCode() { + int result = _funnelStepEvent.hashCode(); + result = 31 * result + _extraFields.hashCode(); + return result; + } + + @Override + public int compareTo(FunnelStepEventWithExtraFields o) { + return _funnelStepEvent.compareTo(o.getFunnelStepEvent()); + } + + public byte[] getBytes() { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + try { + dataOutputStream.write(_funnelStepEvent.getBytes()); + dataOutputStream.write(OBJECT_MAPPER.writeValueAsBytes(_extraFields)); + dataOutputStream.close(); + } catch (Exception e) { + throw new RuntimeException("Caught exception while converting FunnelStepEvent to byte[]", e); + } + return byteArrayOutputStream.toByteArray(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java index 52d04188490f..13b9b7c1afef 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -44,6 +45,7 @@ public abstract class FunnelBaseAggregationFunction protected final FunnelModes _modes = new FunnelModes(); protected final int _numSteps; protected long _maxStepDuration = 0L; + protected final Map _extraArguments = new HashMap<>(); public FunnelBaseAggregationFunction(List arguments) { int numArguments = arguments.size(); @@ -78,7 +80,8 @@ public FunnelBaseAggregationFunction(List arguments) { } break; default: - throw new IllegalArgumentException("Unrecognized arguments: " + extraArgument); + _extraArguments.put(key, parsedExtraArguments[1]); + break; } continue; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java new file mode 100644 index 000000000000..4c2c2a76bcb4 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java @@ -0,0 +1,502 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.funnel.window; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWithExtraFields; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class FunnelEventsFunctionEvalAggregationFunction + implements AggregationFunction, ObjectArrayList> { + protected final ExpressionContext _timestampExpression; + protected final long _windowSize; + protected final List _stepExpressions; + protected final FunnelBaseAggregationFunction.FunnelModes _modes = new FunnelBaseAggregationFunction.FunnelModes(); + protected final int _numSteps; + protected final int _numExtraFields; + protected final List _extraExpressions; + protected long _maxStepDuration = 0L; + + public FunnelEventsFunctionEvalAggregationFunction(List arguments) { + int numArguments = arguments.size(); + Preconditions.checkArgument(numArguments > 3, + "FUNNEL_EVENTS_FUNCTION_EVAL expects >= 4 arguments, got: %s. The function can be used as " + + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, " + + "[stepExpression, ..], [mode, [mode, ... ]])", + numArguments); + _timestampExpression = arguments.get(0); + _windowSize = arguments.get(1).getLiteral().getLongValue(); + Preconditions.checkArgument(_windowSize > 0, "Window size must be > 0"); + _numSteps = arguments.get(2).getLiteral().getIntValue(); + Preconditions.checkArgument(numArguments >= 3 + _numSteps, + "FUNNEL_EVENTS_FUNCTION_EVAL expects >= " + (3 + _numSteps) + + " arguments, got: %s. The function can be used as " + + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, " + + "[stepExpression, ..], [extraArgument/mode, [extraArgument/mode, ... ]])", + numArguments); + _stepExpressions = arguments.subList(3, 3 + _numSteps); + _numExtraFields = arguments.get(3 + _numSteps).getLiteral().getIntValue(); + Preconditions.checkArgument(numArguments >= 4 + _numSteps + _numExtraFields, + "FUNNEL_EVENTS_FUNCTION_EVAL expects >= " + (4 + _numSteps + _numExtraFields) + + " arguments, got: %s. The function can be used as " + + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, " + + "[stepExpression, ..], [extraArgument/mode, [extraArgument/mode, ... ]])", + numArguments); + _extraExpressions = arguments.subList(4 + _numSteps, 4 + _numSteps + _numExtraFields); + + for (int i = 4 + _numSteps + _numExtraFields; i < numArguments; i++) { + String extraArgument = arguments.get(i).getLiteral().getStringValue().toUpperCase(); + String[] parsedExtraArguments = extraArgument.split("="); + if (parsedExtraArguments.length == 2) { + String key = parsedExtraArguments[0].toUpperCase(); + switch (key) { + case FunnelBaseAggregationFunction.FunnelConfigs.MAX_STEP_DURATION: + _maxStepDuration = Long.parseLong(parsedExtraArguments[1]); + Preconditions.checkArgument(_maxStepDuration > 0, "MaxStepDuration must be > 0"); + break; + case FunnelBaseAggregationFunction.FunnelConfigs.MODE: + for (String modeStr : parsedExtraArguments[1].split(",")) { + _modes.add(FunnelBaseAggregationFunction.Mode.valueOf(modeStr.trim())); + } + break; + default: + throw new IllegalArgumentException("Unrecognized arguments: " + extraArgument); + } + continue; + } + try { + _modes.add(FunnelBaseAggregationFunction.Mode.valueOf(extraArgument)); + } catch (Exception e) { + throw new RuntimeException("Unrecognized extra argument for funnel function: " + extraArgument, e); + } + } + } + + @Override + public String getResultColumnName() { + return + String.format("%s(%d)(%s,%s,%s)", getType().getName(), _windowSize, _timestampExpression.toString(), + _stepExpressions.stream().map(ExpressionContext::toString).collect(Collectors.joining(",")), + (_numExtraFields > 0 ? ", " + _extraExpressions.stream().map(ExpressionContext::toString) + .collect(Collectors.joining(",")) : "") + ); + } + + @Override + public List getInputExpressions() { + List inputs = new ArrayList<>(1 + _numSteps + _numExtraFields); + inputs.add(_timestampExpression); + inputs.addAll(_stepExpressions); + inputs.addAll(_extraExpressions); + return inputs; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map blockValSetMap) { + long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV(); + List stepBlocks = new ArrayList<>(_numSteps); + for (ExpressionContext stepExpression : _stepExpressions) { + stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV()); + } + PriorityQueue stepEvents = aggregationResultHolder.getResult(); + if (stepEvents == null) { + stepEvents = new PriorityQueue<>(); + aggregationResultHolder.setValue(stepEvents); + } + List extraFieldsBlocks = getExtraFieldsBlocks(blockValSetMap); + for (int i = 0; i < length; i++) { + boolean stepFound = false; + for (int j = 0; j < _numSteps; j++) { + if (stepBlocks.get(j)[i] == 1) { + List extraFields = extractExtraFields(extraFieldsBlocks, i); + stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], j), extraFields)); + stepFound = true; + break; + } + } + // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1 + if (_modes.hasKeepAll() && !stepFound) { + List extraFields = extractExtraFields(extraFieldsBlocks, i); + stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], -1), extraFields)); + } + } + } + + private List getExtraFieldsBlocks(Map blockValSetMap) { + List extraFieldsBlocks = new ArrayList<>(_numExtraFields); + for (ExpressionContext extraExpression : _extraExpressions) { + BlockValSet blockValSet = blockValSetMap.get(extraExpression); + switch (blockValSet.getValueType()) { + case INT: + extraFieldsBlocks.add(blockValSet.getIntValuesSV()); + break; + case LONG: + case TIMESTAMP: + extraFieldsBlocks.add(blockValSet.getLongValuesSV()); + break; + case FLOAT: + extraFieldsBlocks.add(blockValSet.getFloatValuesSV()); + break; + case DOUBLE: + extraFieldsBlocks.add(blockValSet.getDoubleValuesSV()); + break; + case STRING: + extraFieldsBlocks.add(blockValSet.getStringValuesSV()); + break; + default: + throw new IllegalArgumentException("Unsupported data type for extra field: " + extraExpression + " - " + + blockValSet.getValueType()); + } + } + return extraFieldsBlocks; + } + + private List extractExtraFields(List extraFieldsBlocks, int i) { + List extraFields = new ArrayList<>(_numExtraFields); + for (Object extraFieldsBlock : extraFieldsBlocks) { + switch (extraFieldsBlock.getClass().getComponentType().getSimpleName()) { + case "int": + extraFields.add(((int[]) extraFieldsBlock)[i]); + break; + case "long": + extraFields.add(((long[]) extraFieldsBlock)[i]); + break; + case "float": + extraFields.add(((float[]) extraFieldsBlock)[i]); + break; + case "double": + extraFields.add(((double[]) extraFieldsBlock)[i]); + break; + case "String": + extraFields.add(((String[]) extraFieldsBlock)[i]); + break; + default: + throw new IllegalArgumentException( + "Unsupported data type for extra field: " + extraFieldsBlock.getClass().getComponentType() + .getSimpleName()); + } + } + return extraFields; + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV(); + List stepBlocks = new ArrayList<>(_numSteps); + for (ExpressionContext stepExpression : _stepExpressions) { + stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV()); + } + List extraFieldsBlocks = getExtraFieldsBlocks(blockValSetMap); + for (int i = 0; i < length; i++) { + int groupKey = groupKeyArray[i]; + boolean stepFound = false; + for (int j = 0; j < _numSteps; j++) { + if (stepBlocks.get(j)[i] == 1) { + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); + List extraFields = extractExtraFields(extraFieldsBlocks, i); + stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], j), extraFields)); + stepFound = true; + break; + } + } + // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1 + if (_modes.hasKeepAll() && !stepFound) { + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); + List extraFields = extractExtraFields(extraFieldsBlocks, i); + stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], -1), extraFields)); + } + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV(); + List stepBlocks = new ArrayList<>(_numSteps); + for (ExpressionContext stepExpression : _stepExpressions) { + stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV()); + } + List extraFieldsBlocks = getExtraFieldsBlocks(blockValSetMap); + for (int i = 0; i < length; i++) { + int[] groupKeys = groupKeysArray[i]; + boolean stepFound = false; + for (int j = 0; j < _numSteps; j++) { + if (stepBlocks.get(j)[i] == 1) { + for (int groupKey : groupKeys) { + PriorityQueue stepEvents = + getFunnelStepEvents(groupByResultHolder, groupKey); + List extraFields = extractExtraFields(extraFieldsBlocks, i); + stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], j), extraFields)); + } + stepFound = true; + break; + } + } + // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1 + if (_modes.hasKeepAll() && !stepFound) { + for (int groupKey : groupKeys) { + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); + List extraFields = extractExtraFields(extraFieldsBlocks, i); + stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], -1), extraFields)); + } + } + } + } + + private static PriorityQueue getFunnelStepEvents( + GroupByResultHolder groupByResultHolder, + int groupKey) { + PriorityQueue stepEvents = groupByResultHolder.getResult(groupKey); + if (stepEvents == null) { + stepEvents = new PriorityQueue<>(); + groupByResultHolder.setValueForKey(groupKey, stepEvents); + } + return stepEvents; + } + + @Override + public PriorityQueue extractAggregationResult( + AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public PriorityQueue extractGroupByResult(GroupByResultHolder groupByResultHolder, + int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public PriorityQueue merge( + PriorityQueue intermediateResult1, + PriorityQueue intermediateResult2) { + if (intermediateResult1 == null) { + return intermediateResult2; + } + if (intermediateResult2 == null) { + return intermediateResult1; + } + intermediateResult1.addAll(intermediateResult2); + return intermediateResult1; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.STRING_ARRAY; + } + + /** + * Fill the sliding window with the events that fall into the window. + * Note that the events from stepEvents are dequeued and added to the sliding window. + * This method ensure the first event from the sliding window is the first step event. + * + * @param stepEvents The priority queue of step events + * @param slidingWindow The sliding window with events that fall into the window + */ + protected void fillWindow(PriorityQueue stepEvents, + ArrayDeque slidingWindow) { + // Ensure for the sliding window, the first event is the first step + while ((!slidingWindow.isEmpty()) && slidingWindow.peek().getFunnelStepEvent().getStep() != 0) { + slidingWindow.pollFirst(); + } + if (slidingWindow.isEmpty()) { + while (!stepEvents.isEmpty() && stepEvents.peek().getFunnelStepEvent().getStep() != 0) { + stepEvents.poll(); + } + if (stepEvents.isEmpty()) { + return; + } + slidingWindow.addLast(stepEvents.poll()); + } + // SlidingWindow is not empty + long windowStart = slidingWindow.peek().getFunnelStepEvent().getTimestamp(); + long windowEnd = windowStart + _windowSize; + while (!stepEvents.isEmpty() && (stepEvents.peek().getFunnelStepEvent().getTimestamp() < windowEnd)) { + if (_maxStepDuration > 0) { + // When maxStepDuration > 0, we need to check if the event_to_add has a timestamp within the max duration + // from the last event in the sliding window. If not, we break the loop. + if (stepEvents.peek().getFunnelStepEvent().getTimestamp() - slidingWindow.getLast().getFunnelStepEvent() + .getTimestamp() > _maxStepDuration) { + break; + } + } + slidingWindow.addLast(stepEvents.poll()); + } + } + + @Override + public String toExplainString() { + //@formatter:off + return getType().getName() + "{" + + "timestampExpression=" + _timestampExpression + + ", windowSize=" + _windowSize + + ", stepExpressions=" + _stepExpressions + + ", extraExpressions=" + _extraExpressions + + '}'; + //@formatter:on + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FUNNELEVENTSFUNCTIONEVAL; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.STRING_ARRAY; + } + + @Override + public ObjectArrayList extractFinalResult(PriorityQueue stepEvents) { + ObjectArrayList finalResults = new ObjectArrayList<>(); + List> matchedFunnelEventsExtraFields = new ArrayList<>(_numExtraFields); + if (stepEvents == null || stepEvents.isEmpty()) { + return finalResults; + } + ArrayDeque slidingWindow = new ArrayDeque<>(); + while (!stepEvents.isEmpty()) { + fillWindow(stepEvents, slidingWindow); + if (slidingWindow.isEmpty()) { + break; + } + + long windowStart = slidingWindow.peek().getFunnelStepEvent().getTimestamp(); + + int maxStep = 0; + long previousTimestamp = -1; + for (FunnelStepEventWithExtraFields event : slidingWindow) { + int currentEventStep = event.getFunnelStepEvent().getStep(); + // If the same condition holds for the sequence of events, then such repeating event interrupts further + // processing. + if (_modes.hasStrictDeduplication()) { + if (currentEventStep == maxStep - 1) { + maxStep = 0; + } + } + // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D + // and the max event level is 2. + if (_modes.hasStrictOrder()) { + if (currentEventStep != maxStep) { + maxStep = 0; + } + } + // Apply conditions only to events with strictly increasing timestamps. + if (_modes.hasStrictIncrease()) { + if (previousTimestamp == event.getFunnelStepEvent().getTimestamp()) { + continue; + } + } + previousTimestamp = event.getFunnelStepEvent().getTimestamp(); + if (maxStep == currentEventStep) { + maxStep++; + } + if (maxStep == _numSteps) { + matchedFunnelEventsExtraFields.add(extractFunnelEventsExtraFields(slidingWindow)); + maxStep = 0; + windowStart = event.getFunnelStepEvent().getTimestamp(); + } + } + if (!slidingWindow.isEmpty()) { + slidingWindow.pollFirst(); + } + // sliding window should pop until current event: + while (!slidingWindow.isEmpty() && slidingWindow.peek().getFunnelStepEvent().getTimestamp() < windowStart) { + slidingWindow.pollFirst(); + } + } + + evalFunctionOnMatchedFunnelEvents(matchedFunnelEventsExtraFields, finalResults); + return finalResults; + } + + private void evalFunctionOnMatchedFunnelEvents(List> matchedFunnelEventsExtraFields, + ObjectArrayList finalResults) { + StringBuilder arrayAssignments = new StringBuilder(Integer.toString(matchedFunnelEventsExtraFields.size())); + for (List funnelEventsExtraField : matchedFunnelEventsExtraFields) { + arrayAssignments.append(", ").append(funnelEventsExtraField.size() * _numExtraFields); + } + finalResults.add(arrayAssignments.toString()); + for (List matchedFunnelEventsExtraField : matchedFunnelEventsExtraFields) { + for (Object[] extraFields : matchedFunnelEventsExtraField) { + for (Object extraField : extraFields) { + finalResults.add(extraField.toString()); + } + } + } + } + + private List extractFunnelEventsExtraFields(ArrayDeque slidingWindow) { + List results = new ArrayList<>(); + int step = 0; + for (FunnelStepEventWithExtraFields event : slidingWindow) { + if (event.getFunnelStepEvent().getStep() == step) { + Object[] extraFields = new Object[_numExtraFields]; + List extraFieldsList = event.getExtraFields(); + for (int i = 0; i < _numExtraFields; i++) { + extraFields[i] = extraFieldsList.get(i); + } + results.add(extraFields); + step++; + } + } + return results; + } + + @Override + public ObjectArrayList mergeFinalResult(ObjectArrayList finalResult1, + ObjectArrayList finalResult2) { + if (finalResult1 == null) { + return finalResult2; + } + if (finalResult2 == null) { + return finalResult1; + } + finalResult1.addAll(finalResult2); + return finalResult1; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java new file mode 100644 index 000000000000..ef585e0583ea --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.funnel.window; + +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; +import org.apache.pinot.segment.local.aggregator.AvgValueAggregator; +import org.apache.pinot.segment.local.aggregator.PercentileEstValueAggregator; +import org.apache.pinot.segment.local.customobject.AvgPair; +import org.apache.pinot.segment.local.customobject.QuantileDigest; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.utils.CommonConstants; + + +public class FunnelStepDurationStatsAggregationFunction extends FunnelBaseAggregationFunction { + + private static final AvgValueAggregator AVG_VALUE_AGGREGATOR = new AvgValueAggregator(); + private static final PercentileEstValueAggregator PERCENTILE_EST_VALUE_AGGREGATOR = + new PercentileEstValueAggregator(); + + private final List _durationFunctions = new ArrayList<>(); + private boolean _canSkipNonMatchedFunnel = true; + + public FunnelStepDurationStatsAggregationFunction(List arguments) { + super(arguments); + if (_extraArguments.get("DURATIONFUNCTIONS") != null) { + String[] durationFunctions = _extraArguments.get("DURATIONFUNCTIONS").split(","); + for (String durationFunction : durationFunctions) { + String functionName = durationFunction.trim().toUpperCase(); + if (functionName.equals("AVG") || functionName.equals("MEDIAN") || functionName.equals("MIN") + || functionName.equals("MAX")) { + _durationFunctions.add(functionName); + } else if (functionName.equals("COUNT")) { + _canSkipNonMatchedFunnel = false; + _durationFunctions.add(functionName); + } else if (functionName.startsWith("PERCENTILE")) { + try { + double quantile = Double.parseDouble(functionName.substring("PERCENTILE".length())) / 100.0; + if (quantile < 0 || quantile > 1) { + throw new IllegalArgumentException("Invalid percentile value: " + quantile); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid percentile function name: " + functionName + ", must be " + + "PERCENTILE followed by a double value between 0 and 100"); + } + _durationFunctions.add(functionName); + } else { + throw new IllegalArgumentException("Unsupported duration function: " + functionName); + } + } + } else { + throw new IllegalArgumentException( + "Duration functions must be provided for FunnelStepDurationStatsAggregationFunction"); + } + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FUNNELSTEPDURATIONSTATS; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.DOUBLE_ARRAY; + } + + @Override + public DoubleArrayList extractFinalResult(PriorityQueue stepEvents) { + if (stepEvents == null || stepEvents.isEmpty()) { + return new DoubleArrayList(); + } + Map> stepValueAggregators = initValueAggregator(); + boolean hasMatchedFunnel = false; + ArrayDeque slidingWindow = new ArrayDeque<>(); + while (!stepEvents.isEmpty()) { + fillWindow(stepEvents, slidingWindow); + if (slidingWindow.isEmpty()) { + break; + } + int maxSteps = processWindow(slidingWindow); + if (maxSteps == _numSteps) { + applyStepDurations(stepValueAggregators, slidingWindow); + hasMatchedFunnel = true; + } else { + // Add counts for not completed funnels + for (int i = 0; i < maxSteps; i++) { + List objects = stepValueAggregators.get(i); + for (Object count : objects) { + if (count instanceof AtomicInteger) { + ((AtomicInteger) count).set(1); + } + } + } + } + if (!slidingWindow.isEmpty()) { + slidingWindow.pollFirst(); + } + } + if (_canSkipNonMatchedFunnel && !hasMatchedFunnel) { + return new DoubleArrayList(); + } + return getStepDurationResults(stepValueAggregators, hasMatchedFunnel); + } + + private void applyStepDurations(Map> stepAggregatorValues, + ArrayDeque slidingWindow) { + List stepTimestamp = new ArrayList<>(); + for (FunnelStepEvent event : slidingWindow) { + int step = event.getStep(); + if (stepTimestamp.size() <= step) { + stepTimestamp.add(event.getTimestamp()); + } + } + for (int i = 0; i < stepTimestamp.size() - 1; i++) { + long duration = stepTimestamp.get(i + 1) - stepTimestamp.get(i); + for (Object stepAggregatorValue : stepAggregatorValues.get(i)) { + if (stepAggregatorValue instanceof AtomicInteger) { + ((AtomicInteger) stepAggregatorValue).set(1); + } else if (stepAggregatorValue instanceof AvgPair) { + AVG_VALUE_AGGREGATOR.applyRawValue((AvgPair) stepAggregatorValue, duration); + } else if (stepAggregatorValue instanceof QuantileDigest) { + PERCENTILE_EST_VALUE_AGGREGATOR.applyRawValue((QuantileDigest) stepAggregatorValue, duration); + } + } + } + if (stepAggregatorValues.get(_numSteps - 1) != null) { + for (Object stepAggregatorValue : stepAggregatorValues.get(_numSteps - 1)) { + if (stepAggregatorValue instanceof AtomicInteger) { + ((AtomicInteger) stepAggregatorValue).set(1); + } + } + } + } + + private Map> initValueAggregator() { + Map> stepValueAggregators = new HashMap<>(); + for (int step = 0; step < _numSteps; step++) { + List valueAggregators = new ArrayList<>(); + valueAggregators.add(new AtomicInteger(0)); + valueAggregators.add(new AvgPair()); + valueAggregators.add(new QuantileDigest(0)); + stepValueAggregators.put(step, valueAggregators); + } + return stepValueAggregators; + } + + private DoubleArrayList getStepDurationResults(Map> valueAggregatorResults, + boolean hasMatchedFunnel) { + DoubleArrayList result = new DoubleArrayList(_durationFunctions.size() * (_numSteps - 1)); + for (int step = 0; step < _numSteps; step++) { + AtomicReference avgPair = new AtomicReference<>(); + AtomicReference quantileDigest = new AtomicReference<>(); + AtomicInteger count = new AtomicInteger(); + valueAggregatorResults.get(step).forEach(valueAggregator -> { + if (valueAggregator instanceof AvgPair) { + avgPair.set((AvgPair) valueAggregator); + } + if (valueAggregator instanceof QuantileDigest) { + quantileDigest.set((QuantileDigest) valueAggregator); + } + if (valueAggregator instanceof AtomicInteger) { + count.set(((AtomicInteger) valueAggregator).intValue()); + } + }); + for (int i = 0; i < _durationFunctions.size(); i++) { + String durationFunction = _durationFunctions.get(i); + if (durationFunction.equals("COUNT")) { + result.add(count.get()); + continue; + } + if (!hasMatchedFunnel || step == _numSteps - 1) { + result.add(CommonConstants.NullValuePlaceHolder.DOUBLE); + continue; + } + if (durationFunction.equals("AVG")) { + result.add(avgPair.get().getSum() / avgPair.get().getCount()); + } else if (durationFunction.equals("MEDIAN")) { + result.add(quantileDigest.get().getQuantile(0.5)); + } else if (durationFunction.equals("MIN")) { + result.add(quantileDigest.get().getQuantile(0)); + } else if (durationFunction.equals("MAX")) { + result.add(quantileDigest.get().getQuantile(1)); + } else if (durationFunction.startsWith("PERCENTILE")) { + double quantile = Double.parseDouble(durationFunction.substring("PERCENTILE".length())) / 100.0; + result.add(quantileDigest.get().getQuantile(quantile)); + } + } + } + return result; + } + + protected Integer processWindow(ArrayDeque slidingWindow) { + int maxStep = 0; + long previousTimestamp = -1; + for (FunnelStepEvent event : slidingWindow) { + int currentEventStep = event.getStep(); + // If the same condition holds for the sequence of events, then such repeating event interrupts further + // processing. + if (_modes.hasStrictDeduplication()) { + if (currentEventStep == maxStep - 1) { + return maxStep; + } + } + // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D + // and the max event level is 2. + if (_modes.hasStrictOrder()) { + if (currentEventStep != maxStep) { + return maxStep; + } + } + // Apply conditions only to events with strictly increasing timestamps. + if (_modes.hasStrictIncrease()) { + if (previousTimestamp == event.getTimestamp()) { + continue; + } + } + if (maxStep == currentEventStep) { + maxStep++; + previousTimestamp = event.getTimestamp(); + } + if (maxStep == _numSteps) { + break; + } + } + return maxStep; + } + + @Override + public DoubleArrayList mergeFinalResult(DoubleArrayList finalResult1, DoubleArrayList finalResult2) { + if (finalResult1 == null) { + return finalResult2; + } + return finalResult1; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java index a318e5698c49..0c4cc22a51fc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java @@ -894,6 +894,157 @@ public void testFunnelCompleteCountGroupByQueriesSkipLeaf(boolean useMultiStageQ } } + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelEventsFunctionEvalGroupByQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "userId, funnelEventsFunctionEval(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "3, timestampCol, userId, url" + + ") " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + System.out.println("query = " + query); + System.out.println("jsonNode = " + jsonNode); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + System.out.println(rows); + assertEquals(rows.size(), 40); + for (int i = 0; i < rows.size(); i++) { + JsonNode row = rows.get(i); + System.out.println("row = " + row); + //assertEquals(row.size(), 2); + //assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + //assertEquals(row.get(1).intValue(), 1); + break; + case 1: + //assertEquals(row.get(1).intValue(), 0); + break; + case 2: + //assertEquals(row.get(1).intValue(), 0); + break; + case 3: + //assertEquals(row.get(1).intValue(), 0); + break; + default: + throw new IllegalStateException(); + } + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelStepDurationStatsGroupByQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "userId, funnelStepDurationStats(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'durationFunctions=count,avg,min,median,percentile95,max' " + + ") as statsArray " + + "FROM %s GROUP BY userId HAVING arrayLengthDouble(statsArray) > 0 ORDER BY userId LIMIT %d", + getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + System.out.println("query = " + query); + System.out.println("jsonNode = " + jsonNode); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + System.out.println(rows); + //assertEquals(rows.size(), 40); + for (int i = 0; i < rows.size(); i++) { + JsonNode row = rows.get(i); + System.out.println("row = " + row); + //assertEquals(row.size(), 2); + //assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + //assertEquals(row.get(1).intValue(), 1); + break; + case 1: + //assertEquals(row.get(1).intValue(), 0); + break; + case 2: + //assertEquals(row.get(1).intValue(), 0); + break; + case 3: + //assertEquals(row.get(1).intValue(), 0); + break; + default: + throw new IllegalStateException(); + } + } + } + + + @Test(dataProvider = "useV2QueryEngine") + public void testFunnelStepDurationStatsGroupByQueries2(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("WITH durationStats AS (SELECT " + + "/*+ aggOptions( is_partitioned_by_group_by_keys = 'true') */ " + + "userId, funnelStepDurationStats(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'durationFunctions=count,avg,median' " + + ") as stats " + + "FROM %s " + + "GROUP BY userId) " + + "SELECT " + + "sum(arrayElementAtDouble(stats, 1)) AS count_step0, " + + "AVG(arrayElementAtDouble(stats, 2)) AS avg_avg_step0_to_step1, " + + "AVG(arrayElementAtDouble(stats, 3)) AS avg_median_step0_to_step1, " + + "sum(arrayElementAtDouble(stats, 4)) AS count_step1, " + + "AVG(arrayElementAtDouble(stats, 5)) AS avg_avg_step1_to_step2, " + + "AVG(arrayElementAtDouble(stats, 6)) AS avg_median_step1_to_step2, " + + "sum(arrayElementAtDouble(stats, 7)) AS count_step2, " + + "AVG(arrayElementAtDouble(stats, 8)) AS avg_avg_step2_to_step3, " + + "AVG(arrayElementAtDouble(stats, 9)) AS avg_median_step2_to_step3, " + + "sum(arrayElementAtDouble(stats, 10)) AS count_step3 " + + "FROM durationStats " + + "OPTION(useMultistageEngine=true, numGroupsLimit=2000000, timeoutMs=1800000, " + + "serverReturnFinalResult=true, numThreadsForFinalReduce=4)", + getTableName()); + JsonNode jsonNode = postQuery(query); + System.out.println("query = " + query); + System.out.println("jsonNode = " + jsonNode); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + System.out.println(rows); + //assertEquals(rows.size(), 40); + for (int i = 0; i < rows.size(); i++) { + JsonNode row = rows.get(i); + System.out.println("row = " + row); + //assertEquals(row.size(), 2); + //assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + //assertEquals(row.get(1).intValue(), 1); + break; + case 1: + //assertEquals(row.get(1).intValue(), 0); + break; + case 2: + //assertEquals(row.get(1).intValue(), 0); + break; + case 3: + //assertEquals(row.get(1).intValue(), 0); + break; + default: + throw new IllegalStateException(); + } + } + } + @Override public String getTableName() { return DEFAULT_TABLE_NAME; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 74e7a135b45b..04e4038e0fa9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -180,8 +180,13 @@ public enum AggregationFunctionType { // funnel aggregate functions FUNNELMAXSTEP("funnelMaxStep", ReturnTypes.INTEGER, OperandTypes.VARIADIC, SqlTypeName.OTHER), FUNNELCOMPLETECOUNT("funnelCompleteCount", ReturnTypes.INTEGER, OperandTypes.VARIADIC, SqlTypeName.OTHER), + FUNNELSTEPDURATIONSTATS("funnelStepDurationStats", new ArrayReturnTypeInference(SqlTypeName.DOUBLE), + OperandTypes.VARIADIC, SqlTypeName.OTHER), FUNNELMATCHSTEP("funnelMatchStep", new ArrayReturnTypeInference(SqlTypeName.INTEGER), OperandTypes.VARIADIC, SqlTypeName.OTHER), + FUNNELEVENTSFUNCTIONEVAL("funnelEventsFunctionEval", new ArrayReturnTypeInference(SqlTypeName.VARCHAR), + OperandTypes.VARIADIC, + SqlTypeName.OTHER), FUNNELCOUNT("funnelCount", new ArrayReturnTypeInference(SqlTypeName.BIGINT), OperandTypes.VARIADIC, SqlTypeName.OTHER),