Skip to content

Commit

Permalink
HIVE-28725: Sorting is performed when order by position is disabled w…
Browse files Browse the repository at this point in the history
…hen CBO is enabled
  • Loading branch information
kasakrisz committed Jan 28, 2025
1 parent 36608b4 commit d4f0053
Show file tree
Hide file tree
Showing 5 changed files with 595 additions and 11 deletions.
38 changes: 27 additions & 11 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -5405,6 +5405,9 @@ OrderByRelBuilder addSortByKeys(ASTNode obAST) throws SemanticException {
ASTNode ref = (ASTNode) nullObASTExpr.getChild(0);

int fieldIndex = genSortByKey(ref);
if (fieldIndex < 0) {
continue;
}

// 2.4 Determine the Direction of order by
RelFieldCollation.Direction order = RelFieldCollation.Direction.DESCENDING;
Expand Down Expand Up @@ -5448,6 +5451,7 @@ private int genSortByKey(ASTNode ref) throws SemanticException {
LOG.warn("Using constant number {}" +
" in order by. If you try to use position alias when hive.orderby.position.alias is false, " +
"the position alias will be ignored.", ref.getText());
return -1;
}
} else {
// 2.2 Convert ExprNode to RexNode
Expand All @@ -5465,8 +5469,6 @@ private int genSortByKey(ASTNode ref) throws SemanticException {
return fieldIndex;
}
}

return 0;
}

private RexNode getOrderByExpression(
Expand Down Expand Up @@ -5520,16 +5522,21 @@ OrderByRelBuilder addRelDistribution(ASTNode distributeByAST) throws SemanticExc
for (int i = 0; i < distributeByAST.getChildCount(); ++i) {
ASTNode keyAST = (ASTNode) distributeByAST.getChild(i);
int fieldIndex = genSortByKey(keyAST);
keys.add(fieldIndex);
if (fieldIndex >= 0) {
keys.add(fieldIndex);
}
}
ImmutableList<Integer> keyList = keys.build();
if (!keyList.isEmpty()) {
hiveRelDistribution = new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, keyList);
return this;
}
hiveRelDistribution = new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, keys.build());
} else {
// In case of SORT BY we do not need Distribution
// but the instance RelDistributions.ANY can not be used here because
// org.apache.calcite.rel.core.Exchange has
// assert distribution != RelDistributions.ANY;
hiveRelDistribution = new HiveRelDistribution(RelDistribution.Type.ANY, RelDistributions.ANY.getKeys());
}
// In case of SORT BY we do not need Distribution
// but the instance RelDistributions.ANY can not be used here because
// org.apache.calcite.rel.core.Exchange has
// assert distribution != RelDistributions.ANY;
hiveRelDistribution = new HiveRelDistribution(RelDistribution.Type.ANY, RelDistributions.ANY.getKeys());
return this;
}

Expand Down Expand Up @@ -5599,6 +5606,10 @@ private void genOBProject() throws SemanticException {
RelNode sortLimit(RexNode offsetRN, RexNode fetchRN) throws SemanticException {
genOBProject();

if (fieldCollations.isEmpty()) {
return endGenOBLogicalPlan(obInputRel);
}

// 4. Construct SortRel
RelOptCluster cluster = calcitePlannerAction.cluster;
RelTraitSet traitSet = cluster.traitSetOf(HiveRelNode.CONVENTION);
Expand All @@ -5610,13 +5621,18 @@ RelNode sortLimit(RexNode offsetRN, RexNode fetchRN) throws SemanticException {
RelNode sortExchange() throws SemanticException {
genOBProject();

if (fieldCollations.isEmpty() && hiveRelDistribution.getKeys().isEmpty()) {
return endGenOBLogicalPlan(obInputRel);
}

RelCollation canonizedCollation = RelCollations.of(fieldCollations);
ImmutableList.Builder<RexNode> builder = ImmutableList.builder();
for (RelFieldCollation relFieldCollation : canonizedCollation.getFieldCollations()) {
int index = relFieldCollation.getFieldIndex();
builder.add(calcitePlannerAction.cluster.getRexBuilder().makeInputRef(obInputRel, index));
}
RelNode sortRel = HiveSortExchange.create(obInputRel, hiveRelDistribution, canonizedCollation, builder.build());
ImmutableList<RexNode> keys = builder.build();
RelNode sortRel = HiveSortExchange.create(obInputRel, hiveRelDistribution, canonizedCollation, keys);
return endGenOBLogicalPlan(sortRel);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
create table t1 (a string, b int, c int);

-- distribute by
explain cbo
select * from t1 distribute by 2;
explain
select * from t1 distribute by 2;

-- distribute by and sort by
explain cbo
select * from t1 distribute by 1, b sort by 2;

explain
select * from t1 distribute by 1, b sort by 2;

-- cluster by
explain cbo
select * from t1 cluster by 1, b;

explain
select * from t1 cluster by 1, b;
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- When orderby.position.alias is disabled, we expect no operation to occur if a constant integer is specified as a key.
set hive.orderby.position.alias=false;

create table t1 (a string, b int, c int);

-- order by
explain cbo
select * from t1 order by 2, 3;
explain
select * from t1 order by 2, 3;

-- distribute by
explain cbo
select * from t1 distribute by 2;
explain
select * from t1 distribute by 2;

-- distribute by and sort by
explain cbo
select * from t1 distribute by 1, b sort by 2;

explain
select * from t1 distribute by 1, b sort by 2;

-- cluster by
explain cbo
select * from t1 cluster by 1;

explain
select * from t1 cluster by 1;

explain cbo
select * from t1 cluster by 1, b;

explain
select * from t1 cluster by 1, b;
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
PREHOOK: query: create table t1 (a string, b int, c int)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@t1
POSTHOOK: query: create table t1 (a string, b int, c int)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@t1
PREHOOK: query: explain cbo
select * from t1 distribute by 2
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: explain cbo
select * from t1 distribute by 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
CBO PLAN:
HiveSortExchange(distribution=[hash[1]], collation=[[]])
HiveProject(a=[$0], b=[$1], c=[$2])
HiveTableScan(table=[[default, t1]], table:alias=[t1])

PREHOOK: query: explain
select * from t1 distribute by 2
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: explain
select * from t1 distribute by 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: a (type: string), b (type: int), c (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
null sort order:
sort order:
Map-reduce partition columns: _col1 (type: int)
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col1 (type: int), _col2 (type: int)
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), VALUE._col1 (type: int), VALUE._col2 (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

PREHOOK: query: explain cbo
select * from t1 distribute by 1, b sort by 2
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: explain cbo
select * from t1 distribute by 1, b sort by 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
CBO PLAN:
HiveSortExchange(distribution=[hash[0, 1]], collation=[[1]])
HiveProject(a=[$0], b=[$1], c=[$2])
HiveTableScan(table=[[default, t1]], table:alias=[t1])

PREHOOK: query: explain
select * from t1 distribute by 1, b sort by 2
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: explain
select * from t1 distribute by 1, b sort by 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: a (type: string), b (type: int), c (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col1 (type: int)
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: string), _col1 (type: int)
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col2 (type: int)
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: int), VALUE._col1 (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

PREHOOK: query: explain cbo
select * from t1 cluster by 1, b
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: explain cbo
select * from t1 cluster by 1, b
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
CBO PLAN:
HiveSortExchange(distribution=[hash[0, 1]], collation=[[0 ASC-nulls-first, 1 ASC-nulls-first]])
HiveProject(a=[$0], b=[$1], c=[$2])
HiveTableScan(table=[[default, t1]], table:alias=[t1])

PREHOOK: query: explain
select * from t1 cluster by 1, b
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: explain
select * from t1 cluster by 1, b
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: a (type: string), b (type: int), c (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: int)
null sort order: aa
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1 (type: int)
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
value expressions: _col2 (type: int)
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: int), VALUE._col0 (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Loading

0 comments on commit d4f0053

Please sign in to comment.