Skip to content

Commit

Permalink
add extreme
Browse files Browse the repository at this point in the history
  • Loading branch information
Beyyes committed Oct 19, 2024
1 parent e3de1b7 commit 7859ad2
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1051,15 +1051,20 @@ public void lastByFirstByTest() {
}

@Test
public void maxByMinByTest() {
String[] expectedHeader1 = buildHeaders(10);
public void maxByMinByExtremeTest() {
expectedHeader = buildHeaders(10);
sql =
"select max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv) from table0";
retArray =
new String[] {
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

expectedHeader = buildHeaders(3);
sql = "select extreme(num),extreme(bignum),extreme(floatnum) from table0";
retArray = new String[] {"15,3147483648,4654.231,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

// ==================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public static TableAccumulator createBuiltinAccumulator(
return new TableMaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1));
case MIN_BY:
return new TableMinByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1));
case EXTREME:
return new ExtremeAccumulator(inputDataTypes.get(0));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;

import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;

public class ExtremeAccumulator implements TableAccumulator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(ExtremeAccumulator.class);
private final TSDataType seriesDataType;
private final TsPrimitiveType extremeResult;
private boolean initResult;

public ExtremeAccumulator(TSDataType seriesDataType) {
this.seriesDataType = seriesDataType;
this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
}

@Override
public long getEstimatedSize() {
return INSTANCE_SIZE;
}

@Override
public TableAccumulator copy() {
return new ExtremeAccumulator(seriesDataType);
}

@Override
public void addInput(Column[] arguments) {
switch (seriesDataType) {
case INT32:
addIntInput(arguments[0]);
return;
case INT64:
addLongInput(arguments[0]);
return;
case FLOAT:
addFloatInput(arguments[0]);
return;
case DOUBLE:
addDoubleInput(arguments[0]);
return;
case TEXT:
case STRING:
case BLOB:
case BOOLEAN:
case DATE:
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in Extreme: %s", seriesDataType));
}
}

// partialResult should be like: | PartialExtremeValue |
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (argument.isNull(i)) {
continue;
}

switch (seriesDataType) {
case INT32:
updateIntResult(argument.getInt(i));
break;
case INT64:
updateLongResult(argument.getLong(i));
break;
case FLOAT:
updateFloatResult(argument.getFloat(i));
break;
case DOUBLE:
updateDoubleResult(argument.getDouble(i));
break;
case TEXT:
case STRING:
case BLOB:
case BOOLEAN:
case DATE:
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in Extreme: %s", seriesDataType));
}
}
}

@Override
public void addStatistics(Statistics[] statistics) {
if (statistics == null || statistics[0] == null) {
return;
}

switch (seriesDataType) {
case INT32:
updateIntResult((int) statistics[0].getMaxValue());
updateIntResult((int) statistics[0].getMinValue());
break;
case INT64:
updateLongResult((long) statistics[0].getMaxValue());
updateLongResult((long) statistics[0].getMinValue());
break;
case FLOAT:
updateFloatResult((float) statistics[0].getMaxValue());
updateFloatResult((float) statistics[0].getMinValue());
break;
case DOUBLE:
updateDoubleResult((double) statistics[0].getMaxValue());
updateDoubleResult((double) statistics[0].getMinValue());
break;
case TEXT:
case STRING:
case BLOB:
case BOOLEAN:
case DATE:
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in Extreme: %s", seriesDataType));
}
}

// columnBuilder should be single in ExtremeAccumulator
@Override
public void evaluateIntermediate(ColumnBuilder columnBuilder) {
if (!initResult) {
columnBuilder.appendNull();
return;
}

switch (seriesDataType) {
case INT32:
columnBuilder.writeInt(extremeResult.getInt());
break;
case INT64:
columnBuilder.writeLong(extremeResult.getLong());
break;
case FLOAT:
columnBuilder.writeFloat(extremeResult.getFloat());
break;
case DOUBLE:
columnBuilder.writeDouble(extremeResult.getDouble());
break;
case TEXT:
case STRING:
case BLOB:
case BOOLEAN:
case DATE:
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in Extreme: %s", seriesDataType));
}
}

@Override
public void evaluateFinal(ColumnBuilder columnBuilder) {
if (!initResult) {
columnBuilder.appendNull();
return;
}

switch (seriesDataType) {
case INT32:
columnBuilder.writeInt(extremeResult.getInt());
break;
case INT64:
columnBuilder.writeLong(extremeResult.getLong());
break;
case FLOAT:
columnBuilder.writeFloat(extremeResult.getFloat());
break;
case DOUBLE:
columnBuilder.writeDouble(extremeResult.getDouble());
break;
case TEXT:
case STRING:
case BLOB:
case BOOLEAN:
case DATE:
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in Extreme: %s", seriesDataType));
}
}

@Override
public void reset() {
initResult = false;
extremeResult.reset();
}

@Override
public boolean hasFinalResult() {
return false;
}

private void addIntInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
updateIntResult(column.getInt(i));
}
}
}

private void updateIntResult(int val) {
int absExtVal = Math.abs(val);
int candidateResult = extremeResult.getInt();
int absCandidateResult = Math.abs(extremeResult.getInt());

if (!initResult
|| (absExtVal > absCandidateResult)
|| (absExtVal == absCandidateResult) && val > candidateResult) {
initResult = true;
extremeResult.setInt(val);
}
}

private void addLongInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
updateLongResult(column.getLong(i));
}
}
}

private void updateLongResult(long val) {
long absExtVal = Math.abs(val);
long candidateResult = extremeResult.getLong();
long absCandidateResult = Math.abs(extremeResult.getLong());

if (!initResult
|| (absExtVal > absCandidateResult)
|| (absExtVal == absCandidateResult) && val > candidateResult) {
initResult = true;
extremeResult.setLong(val);
}
}

private void addFloatInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
updateFloatResult(column.getFloat(i));
}
}
}

private void updateFloatResult(float val) {
float absExtVal = Math.abs(val);
float candidateResult = extremeResult.getFloat();
float absCandidateResult = Math.abs(extremeResult.getFloat());

if (!initResult
|| (absExtVal > absCandidateResult)
|| (absExtVal == absCandidateResult) && val > candidateResult) {
initResult = true;
extremeResult.setFloat(val);
}
}

private void addDoubleInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
updateDoubleResult(column.getDouble(i));
}
}
}

private void updateDoubleResult(double val) {
double absExtVal = Math.abs(val);
double candidateResult = extremeResult.getDouble();
double absCandidateResult = Math.abs(extremeResult.getDouble());

if (!initResult
|| (absExtVal > absCandidateResult)
|| (absExtVal == absCandidateResult) && val > candidateResult) {
initResult = true;
extremeResult.setDouble(val);
}
}
}

0 comments on commit 7859ad2

Please sign in to comment.