Skip to content

Commit

Permalink
Extended CollationSuite and added tests where SortMergeJoin is forced
Browse files Browse the repository at this point in the history
  • Loading branch information
vladanvasi-db committed Nov 6, 2024
1 parent 46fe10a commit 81b08fc
Showing 1 changed file with 162 additions and 5 deletions.
167 changes: 162 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}
}

test("hash join should be used for collated strings") {
test("hash join should be used for collated strings if sort merge join is not forced") {
val t1 = "T_1"
val t2 = "T_2"

Expand Down Expand Up @@ -1598,11 +1598,38 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
case b: HashJoin => b.leftKeys.head
}.head.isInstanceOf[CollationKey])
}

// Disable broadcast join to force sort merge join.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y")
checkAnswer(df, t.result)

val queryPlan = df.queryExecution.executedPlan

// confirm that sort merge join is used instead of hash join
assert(
collectFirst(queryPlan) {
case _: HashJoin => ()
}.isEmpty
)
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => ()
}.nonEmpty
)

// Only if collation doesn't support binary equality, collation key should be injected.
if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) {
assert(queryPlan.toString().contains("collationkey"))
} else {
assert(!queryPlan.toString().contains("collationkey"))
}
}
}
})
}

test("hash join should be used for arrays of collated strings") {
test("hash join should be used for arrays of collated strings if sort merge join is not forced") {
val t1 = "T_1"
val t2 = "T_2"

Expand Down Expand Up @@ -1656,11 +1683,39 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
case b: BroadcastHashJoinExec => b.leftKeys.head
}.head.isInstanceOf[ArrayTransform])
}

// Disable broadcast join to force sort merge join.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y")
checkAnswer(df, t.result)

val queryPlan = df.queryExecution.executedPlan

// confirm that sort merge join is used instead of hash join
assert(
collectFirst(queryPlan) {
case _: HashJoin => ()
}.isEmpty
)
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => ()
}.nonEmpty
)

// Only if collation doesn't support binary equality, collation key should be injected.
if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) {
assert(queryPlan.toString().contains("collationkey"))
} else {
assert(!queryPlan.toString().contains("collationkey"))
}
}
}
})
}

test("hash join should be used for arrays of arrays of collated strings") {
test("hash join should be used for arrays of arrays of collated strings " +
"if sort merge join is not forced") {
val t1 = "T_1"
val t2 = "T_2"

Expand Down Expand Up @@ -1718,11 +1773,38 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
case b: BroadcastHashJoinExec => b.leftKeys.head
}.head.isInstanceOf[ArrayTransform])
}

// Disable broadcast join to force sort merge join.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y")
checkAnswer(df, t.result)

val queryPlan = df.queryExecution.executedPlan

// confirm that sort merge join is used instead of hash join
assert(
collectFirst(queryPlan) {
case _: HashJoin => ()
}.isEmpty
)
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => ()
}.nonEmpty
)

// Only if collation doesn't support binary equality, collation key should be injected.
if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) {
assert(queryPlan.toString().contains("collationkey"))
} else {
assert(!queryPlan.toString().contains("collationkey"))
}
}
}
})
}

test("hash join should respect collation for struct of strings") {
test("hash and sort merge join should respect collation for struct of strings") {
val t1 = "T_1"
val t2 = "T_2"

Expand Down Expand Up @@ -1771,11 +1853,39 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
} else {
assert(!queryPlan.toString().contains("collationkey"))
}

// Disable broadcast join to force sort merge join.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y")
checkAnswer(df, t.result)

val queryPlan = df.queryExecution.executedPlan

// confirm that sort merge join is used instead of hash join
assert(
collectFirst(queryPlan) {
case _: HashJoin => ()
}.isEmpty
)
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => ()
}.nonEmpty
)

// Only if collation doesn't support binary equality, collation key should be injected.
if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) {
assert(queryPlan.toString().contains("collationkey"))
} else {
assert(!queryPlan.toString().contains("collationkey"))
}
}
}
})
}

test("hash join should respect collation for struct of array of struct of strings") {
test("hash and sort merge join should respect collation " +
"for struct of array of struct of strings") {
val t1 = "T_1"
val t2 = "T_2"

Expand Down Expand Up @@ -1830,6 +1940,33 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
} else {
assert(!queryPlan.toString().contains("collationkey"))
}

// Disable broadcast join to force sort merge join.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y")
checkAnswer(df, t.result)

val queryPlan = df.queryExecution.executedPlan

// confirm that sort merge join is used instead of hash join
assert(
collectFirst(queryPlan) {
case _: HashJoin => ()
}.isEmpty
)
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => ()
}.nonEmpty
)

// Only if collation doesn't support binary equality, collation key should be injected.
if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) {
assert(queryPlan.toString().contains("collationkey"))
} else {
assert(!queryPlan.toString().contains("collationkey"))
}
}
}
})
}
Expand Down Expand Up @@ -1914,6 +2051,26 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
case _: SortMergeJoinExec => ()
}.isEmpty
)

// Disable broadcast join to force sort merge join.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.x AND $t1.y = $t2.y")
checkAnswer(df, t.result)

val queryPlan = df.queryExecution.executedPlan

// confirm that sort merge join is used instead of hash join
assert(
collectFirst(queryPlan) {
case _: HashJoin => ()
}.isEmpty
)
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => ()
}.nonEmpty
)
}
}
})
}
Expand Down

0 comments on commit 81b08fc

Please sign in to comment.