Skip to content

Commit

Permalink
MSQ WF: Pass a flag from broker to determine operator chain transform…
Browse files Browse the repository at this point in the history
…ation (#17443)
  • Loading branch information
Akshat-Jain authored Nov 12, 2024
1 parent ae049a4 commit 3f56b57
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)

final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig),
context.makeQueryKitSpec(makeQueryControllerToolKit(queryContext), queryId, querySpec, queryKernelConfig),
querySpec,
context,
resultsContext
Expand Down Expand Up @@ -1211,13 +1211,19 @@ private Int2ObjectMap<Object> makeWorkerFactoryInfosForStage(
}

@SuppressWarnings("rawtypes")
private QueryKit<Query<?>> makeQueryControllerToolKit()
private QueryKit<Query<?>> makeQueryControllerToolKit(QueryContext queryContext)
{
final Map<Class<? extends Query>, QueryKit> kitMap =
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
.put(
WindowOperatorQuery.class,
new WindowOperatorQueryKit(
context.jsonMapper(),
MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(queryContext)
)
)
.build();

return new MultiQueryKit(kitMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
Expand Down Expand Up @@ -101,7 +97,7 @@ public WindowOperatorQueryFrameProcessor(
this.frameWriterFactory = frameWriterFactory;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext);
this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList);
this.operatorFactoryList = operatorFactoryList;
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);

this.frameReader = frameReader;
Expand Down Expand Up @@ -403,36 +399,4 @@ private void clearRACBuffers()
resultRowAndCols.clear();
rowId.set(0);
}

/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @return
*/
private List<OperatorFactory> getOperatorFactoryListForStageDefinition(List<OperatorFactory> operatorFactoryListFromQuery)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), this.maxRowsMaterialized));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}

operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
Expand All @@ -54,10 +56,12 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
private static final Logger log = new Logger(WindowOperatorQueryKit.class);
private final ObjectMapper jsonMapper;
private final boolean isOperatorTransformationEnabled;

public WindowOperatorQueryKit(ObjectMapper jsonMapper)
public WindowOperatorQueryKit(ObjectMapper jsonMapper, boolean isOperatorTransformationEnabled)
{
this.jsonMapper = jsonMapper;
this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
}

@Override
Expand Down Expand Up @@ -172,6 +176,9 @@ public QueryDefinition makeQueryDefinition(
.flatMap(of -> of.getPartitionColumns().stream())
.collect(Collectors.toList());

final List<OperatorFactory> operatorFactories = isOperatorTransformationEnabled
? getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized)
: operatorList.get(i);

queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
Expand All @@ -181,7 +188,7 @@ public QueryDefinition makeQueryDefinition(
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
operatorList.get(i),
operatorFactories,
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
Expand Down Expand Up @@ -325,4 +332,40 @@ private static RowSignature computeSignatureForFinalWindowStage(RowSignature row
finalWindowClusterBy.getColumns()
);
}

/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @param maxRowsMaterializedInWindow
* @return
*/
private List<OperatorFactory> getTransformedOperatorFactoryListForStageDefinition(
List<OperatorFactory> operatorFactoryListFromQuery,
int maxRowsMaterializedInWindow
)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}

operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ public static MSQSpec makeQuerySpec(
// Add appropriate finalization to native query context.
nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY, finalizeAggregations);

// This flag is to ensure backward compatibility, as brokers are upgraded after indexers/middlemanagers.
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true);

final MSQSpec querySpec =
MSQSpec.builder()
.query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ public class MultiStageQueryContext

public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow";

// This flag ensures backward compatibility and will be removed in Druid 33, with the default behavior as enabled.
public static final String WINDOW_FUNCTION_OPERATOR_TRANSFORMATION = "windowFunctionOperatorTransformation";

public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";

/**
Expand All @@ -217,6 +220,14 @@ public static int getMaxRowsMaterializedInWindow(final QueryContext queryContext
);
}

public static boolean isWindowFunctionOperatorTransformationEnabled(final QueryContext queryContext)
{
return queryContext.getBoolean(
WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
false
);
}

public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.put(MSQTaskQueryMaker.USER_KEY, "allowAll")
.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true)
.build();

public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
"sqlStringifyArrays" : false,
"windowFunctionOperatorTransformation" : true
}
}
},
Expand Down Expand Up @@ -201,7 +202,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
"sqlStringifyArrays" : false,
"windowFunctionOperatorTransformation" : true
}
}
},
Expand Down

0 comments on commit 3f56b57

Please sign in to comment.