Skip to content

Commit

Permalink
added orderby native optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
mario-arduini committed Dec 16, 2020
1 parent d1044b8 commit 679155a
Showing 1 changed file with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import org.rumbledb.exceptions.RumbleException;
import org.rumbledb.exceptions.UnexpectedTypeException;
import org.rumbledb.expressions.ExecutionMode;
import org.rumbledb.expressions.flowr.FLWOR_CLAUSES;
import org.rumbledb.expressions.flowr.OrderByClause;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.RuntimeTupleIterator;
import org.rumbledb.runtime.flwor.FlworDataFrameUtils;
import org.rumbledb.runtime.flwor.NativeClauseContext;
import org.rumbledb.runtime.flwor.expression.OrderByClauseAnnotatedChildIterator;
import org.rumbledb.runtime.flwor.udfs.OrderClauseCreateColumnsUDF;
import org.rumbledb.runtime.flwor.udfs.OrderClauseDetermineTypeUDF;
Expand Down Expand Up @@ -247,6 +250,20 @@ public Dataset<Row> getDataFrame(
null
);

Dataset<Row> nativeQueryResult = tryNativeQuery(
df,
this.expressionsWithIterator,
allColumns,
inputSchema,
context
);
if (nativeQueryResult != null) {
return nativeQueryResult;
}

// was not possible, we use order udf
System.out.println("using UDF");

df.sparkSession()
.udf()
.register(
Expand Down Expand Up @@ -484,4 +501,57 @@ public Map<Name, DynamicContext.VariableDependency> getProjection(
}
return projection;
}

/**
* Try to generate the native query for the order by clause and run it, if successful return the resulting dataframe,
* otherwise it returns null
*
* @param dataFrame input dataframe for the query
* @param expressionsWithIterator list of ordering iterators
* @param allColumns other columns required in following clauses
* @param inputSchema input schema of the dataframe
* @param context current dynamic context of the dataframe
* @return resulting dataframe of the order by clause if successful, null otherwise
*/
public static Dataset<Row> tryNativeQuery(
Dataset<Row> dataFrame,
List<OrderByClauseAnnotatedChildIterator> expressionsWithIterator,
List<String> allColumns,
StructType inputSchema,
DynamicContext context
) {
// the try catch block is required because of the query that are not supported by sparksql like using a field to decide which field to use (e.g. $i.($i.fieldToUse) )
try {
NativeClauseContext orderContext = new NativeClauseContext(FLWOR_CLAUSES.ORDER_BY, inputSchema, context);
StringBuilder orderSql = new StringBuilder();
String orderSeparator = "";
NativeClauseContext nativeQuery;
for(OrderByClauseAnnotatedChildIterator orderIterator : expressionsWithIterator){
nativeQuery = orderIterator.getIterator().generateNativeQuery(orderContext);
if(nativeQuery == NativeClauseContext.NoNativeQuery){
return null;
}
orderSql.append(orderSeparator);
orderSeparator = ", ";
orderSql.append(nativeQuery.getResultingQuery());
if(!orderIterator.isAscending()){
orderSql.append(" desc");
}
}

System.out.println("native query returned: " + orderSql);
String selectSQL = FlworDataFrameUtils.getSQLProjection(allColumns, false);
dataFrame.createOrReplaceTempView("input");
return dataFrame.sparkSession()
.sql(
String.format(
"select %s from input order by %s",
selectSQL,
orderSql
)
);
} catch (Exception e) {
return null;
}
}
}

0 comments on commit 679155a

Please sign in to comment.