diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 01d62a5c3480..568457e8c3dd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -1051,15 +1051,20 @@ public void lastByFirstByTest() { } @Test - public void maxByMinByTest() { - String[] expectedHeader1 = buildHeaders(10); + public void maxByMinByExtremeTest() { + expectedHeader = buildHeaders(10); sql = "select max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv) from table0"; retArray = new String[] { "1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,", }; - tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME); + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + expectedHeader = buildHeaders(3); + sql = "select extreme(num),extreme(bignum),extreme(floatnum) from table0"; + retArray = new String[] {"15,3147483648,4654.231,"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } // ================================================================== diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index 55a2c1e28a56..0dc87039ecbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -150,6 +150,8 @@ public static TableAccumulator createBuiltinAccumulator( return new TableMaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); case MIN_BY: return new TableMinByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + case EXTREME: + return new ExtremeAccumulator(inputDataTypes.get(0)); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java new file mode 100644 index 000000000000..4b8cba3d73ee --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +public class ExtremeAccumulator implements TableAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ExtremeAccumulator.class); + private final TSDataType seriesDataType; + private final TsPrimitiveType extremeResult; + private boolean initResult; + + public ExtremeAccumulator(TSDataType seriesDataType) { + this.seriesDataType = seriesDataType; + this.extremeResult = TsPrimitiveType.getByType(seriesDataType); + } + + @Override + public long getEstimatedSize() { + return INSTANCE_SIZE; + } + + @Override + public TableAccumulator copy() { + return new ExtremeAccumulator(seriesDataType); + } + + @Override + public void addInput(Column[] arguments) { + switch (seriesDataType) { + case INT32: + addIntInput(arguments[0]); + return; + case INT64: + addLongInput(arguments[0]); + return; + case FLOAT: + addFloatInput(arguments[0]); + return; + case DOUBLE: + addDoubleInput(arguments[0]); + return; + case TEXT: + case STRING: + case BLOB: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in Extreme: %s", seriesDataType)); + } + } + + // partialResult should be like: | PartialExtremeValue | + @Override + public void addIntermediate(Column argument) { + for (int i = 0; i < argument.getPositionCount(); i++) { + if (argument.isNull(i)) { + continue; + } + + switch (seriesDataType) { + case INT32: + updateIntResult(argument.getInt(i)); + break; + case INT64: + updateLongResult(argument.getLong(i)); + break; + case FLOAT: + updateFloatResult(argument.getFloat(i)); + break; + case DOUBLE: + updateDoubleResult(argument.getDouble(i)); + break; + case TEXT: + case STRING: + case BLOB: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in Extreme: %s", seriesDataType)); + } + } + } + + @Override + public void addStatistics(Statistics[] statistics) { + if (statistics == null || statistics[0] == null) { + return; + } + + switch (seriesDataType) { + case INT32: + updateIntResult((int) statistics[0].getMaxValue()); + updateIntResult((int) statistics[0].getMinValue()); + break; + case INT64: + updateLongResult((long) statistics[0].getMaxValue()); + updateLongResult((long) statistics[0].getMinValue()); + break; + case FLOAT: + updateFloatResult((float) statistics[0].getMaxValue()); + updateFloatResult((float) statistics[0].getMinValue()); + break; + case DOUBLE: + updateDoubleResult((double) statistics[0].getMaxValue()); + updateDoubleResult((double) statistics[0].getMinValue()); + break; + case TEXT: + case STRING: + case BLOB: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in Extreme: %s", seriesDataType)); + } + } + + // columnBuilder should be single in ExtremeAccumulator + @Override + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + return; + } + + switch (seriesDataType) { + case INT32: + columnBuilder.writeInt(extremeResult.getInt()); + break; + case INT64: + columnBuilder.writeLong(extremeResult.getLong()); + break; + case FLOAT: + columnBuilder.writeFloat(extremeResult.getFloat()); + break; + case DOUBLE: + columnBuilder.writeDouble(extremeResult.getDouble()); + break; + case TEXT: + case STRING: + case BLOB: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in Extreme: %s", seriesDataType)); + } + } + + @Override + public void evaluateFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + return; + } + + switch (seriesDataType) { + case INT32: + columnBuilder.writeInt(extremeResult.getInt()); + break; + case INT64: + columnBuilder.writeLong(extremeResult.getLong()); + break; + case FLOAT: + columnBuilder.writeFloat(extremeResult.getFloat()); + break; + case DOUBLE: + columnBuilder.writeDouble(extremeResult.getDouble()); + break; + case TEXT: + case STRING: + case BLOB: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in Extreme: %s", seriesDataType)); + } + } + + @Override + public void reset() { + initResult = false; + extremeResult.reset(); + } + + @Override + public boolean hasFinalResult() { + return false; + } + + private void addIntInput(Column column) { + for (int i = 0; i < column.getPositionCount(); i++) { + if (!column.isNull(i)) { + updateIntResult(column.getInt(i)); + } + } + } + + private void updateIntResult(int val) { + int absExtVal = Math.abs(val); + int candidateResult = extremeResult.getInt(); + int absCandidateResult = Math.abs(extremeResult.getInt()); + + if (!initResult + || (absExtVal > absCandidateResult) + || (absExtVal == absCandidateResult) && val > candidateResult) { + initResult = true; + extremeResult.setInt(val); + } + } + + private void addLongInput(Column column) { + for (int i = 0; i < column.getPositionCount(); i++) { + if (!column.isNull(i)) { + updateLongResult(column.getLong(i)); + } + } + } + + private void updateLongResult(long val) { + long absExtVal = Math.abs(val); + long candidateResult = extremeResult.getLong(); + long absCandidateResult = Math.abs(extremeResult.getLong()); + + if (!initResult + || (absExtVal > absCandidateResult) + || (absExtVal == absCandidateResult) && val > candidateResult) { + initResult = true; + extremeResult.setLong(val); + } + } + + private void addFloatInput(Column column) { + for (int i = 0; i < column.getPositionCount(); i++) { + if (!column.isNull(i)) { + updateFloatResult(column.getFloat(i)); + } + } + } + + private void updateFloatResult(float val) { + float absExtVal = Math.abs(val); + float candidateResult = extremeResult.getFloat(); + float absCandidateResult = Math.abs(extremeResult.getFloat()); + + if (!initResult + || (absExtVal > absCandidateResult) + || (absExtVal == absCandidateResult) && val > candidateResult) { + initResult = true; + extremeResult.setFloat(val); + } + } + + private void addDoubleInput(Column column) { + for (int i = 0; i < column.getPositionCount(); i++) { + if (!column.isNull(i)) { + updateDoubleResult(column.getDouble(i)); + } + } + } + + private void updateDoubleResult(double val) { + double absExtVal = Math.abs(val); + double candidateResult = extremeResult.getDouble(); + double absCandidateResult = Math.abs(extremeResult.getDouble()); + + if (!initResult + || (absExtVal > absCandidateResult) + || (absExtVal == absCandidateResult) && val > candidateResult) { + initResult = true; + extremeResult.setDouble(val); + } + } +}