diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 42b772d7eba21..2260460b07c30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.errors.DataTypeErrors.toSQLType +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -43,6 +44,39 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { private val collationNonPreservingSources = Seq("orc", "csv", "json", "text") private val allFileBasedDataSources = collationPreservingSources ++ collationNonPreservingSources + @inline + private def isSortMergeForced: Boolean = { + SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD) == -1 + } + + private def checkRightTypeOfJoinUsed(queryPlan: SparkPlan): Unit = { + assert( + // If sort merge join is forced, we should not see HashJoin in the plan. + isSortMergeForced || + // If sort merge join is not forced, we should see HashJoin in the plan + // and not SortMergeJoin. + collectFirst(queryPlan) { + case _: HashJoin => () + }.nonEmpty && + collectFirst(queryPlan) { + case _: SortMergeJoinExec => () + }.isEmpty + ) + + assert( + // If sort merge join is not forced, we should not see SortMergeJoin in the plan. + !isSortMergeForced || + // If sort merge join is forced, we should see SortMergeJoin in the plan + // and not HashJoin. + collectFirst(queryPlan) { + case _: HashJoin => () + }.isEmpty && + collectFirst(queryPlan) { + case _: SortMergeJoinExec => () + }.nonEmpty + ) + } + test("collate returns proper type") { Seq( "utf8_binary", @@ -1562,71 +1596,49 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { HashJoinTestCase("UNICODE_CI_RTRIM", "aa", "AA ", Seq(Row("aa", 1, "AA ", 2), Row("aa", 1, "aa", 2))) ) - - testCases.foreach(t => { + for { + t <- testCases + broadcastJoinThreshold <- Seq(-1, SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + } { withTable(t1, t2) { - sql(s"CREATE TABLE $t1 (x STRING COLLATE ${t.collation}, i int) USING PARQUET") - sql(s"INSERT INTO $t1 VALUES ('${t.data1}', 1)") - - sql(s"CREATE TABLE $t2 (y STRING COLLATE ${t.collation}, j int) USING PARQUET") - sql(s"INSERT INTO $t2 VALUES ('${t.data2}', 2), ('${t.data1}', 2)") - - val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") - checkAnswer(df, t.result) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> broadcastJoinThreshold.toString) { + sql(s"CREATE TABLE $t1 (x STRING COLLATE ${t.collation}, i int) USING PARQUET") + sql(s"INSERT INTO $t1 VALUES ('${t.data1}', 1)") - val queryPlan = df.queryExecution.executedPlan + sql(s"CREATE TABLE $t2 (y STRING COLLATE ${t.collation}, j int) USING PARQUET") + sql(s"INSERT INTO $t2 VALUES ('${t.data2}', 2), ('${t.data1}', 2)") - // confirm that hash join is used instead of sort merge join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.nonEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.isEmpty - ) - - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(collectFirst(queryPlan) { - case b: HashJoin => b.leftKeys.head - }.head.isInstanceOf[CollationKey]) - } else { - assert(!collectFirst(queryPlan) { - case b: HashJoin => b.leftKeys.head - }.head.isInstanceOf[CollationKey]) - } - - // Disable broadcast join to force sort merge join. - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") checkAnswer(df, t.result) val queryPlan = df.queryExecution.executedPlan - // confirm that sort merge join is used instead of hash join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.isEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.nonEmpty - ) + // confirm that right kind of join is used. + checkRightTypeOfJoinUsed(queryPlan) - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(queryPlan.toString().contains("collationkey")) - } else { - assert(!queryPlan.toString().contains("collationkey")) + if (isSortMergeForced) { + // Only if collation doesn't support binary equality, collation key should be injected. + if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { + assert(queryPlan.toString().contains("collationkey")) + } else { + assert(!queryPlan.toString().contains("collationkey")) + } + } + else { + // Only if collation doesn't support binary equality, collation key should be injected. + if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { + assert(collectFirst(queryPlan) { + case b: HashJoin => b.leftKeys.head + }.head.isInstanceOf[CollationKey]) + } else { + assert(!collectFirst(queryPlan) { + case b: HashJoin => b.leftKeys.head + }.head.isInstanceOf[CollationKey]) + } } } } - }) + } } test("hash join should be used for arrays of collated strings if sort merge join is not forced") { @@ -1647,71 +1659,50 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { Seq(Row(Seq("aa"), 1, Seq("AA "), 2), Row(Seq("aa"), 1, Seq("aa"), 2))) ) - testCases.foreach(t => { + for { + t <- testCases + broadcastJoinThreshold <- Seq(-1, SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + } { withTable(t1, t2) { - sql(s"CREATE TABLE $t1 (x ARRAY, i int) USING PARQUET") - sql(s"INSERT INTO $t1 VALUES (array('${t.data1}'), 1)") + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> broadcastJoinThreshold.toString) { + sql(s"CREATE TABLE $t1 (x ARRAY, i int) USING PARQUET") + sql(s"INSERT INTO $t1 VALUES (array('${t.data1}'), 1)") - sql(s"CREATE TABLE $t2 (y ARRAY, j int) USING PARQUET") - sql(s"INSERT INTO $t2 VALUES (array('${t.data2}'), 2), (array('${t.data1}'), 2)") + sql(s"CREATE TABLE $t2 (y ARRAY, j int) USING PARQUET") + sql(s"INSERT INTO $t2 VALUES (array('${t.data2}'), 2), (array('${t.data1}'), 2)") - val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") - checkAnswer(df, t.result) - - val queryPlan = df.queryExecution.executedPlan - - // confirm that hash join is used instead of sort merge join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.nonEmpty - ) - assert( - collectFirst(queryPlan) { - case _: ShuffledJoin => () - }.isEmpty - ) - - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(collectFirst(queryPlan) { - case b: BroadcastHashJoinExec => b.leftKeys.head - }.head.asInstanceOf[ArrayTransform].function.asInstanceOf[LambdaFunction]. - function.isInstanceOf[CollationKey]) - } else { - assert(!collectFirst(queryPlan) { - case b: BroadcastHashJoinExec => b.leftKeys.head - }.head.isInstanceOf[ArrayTransform]) - } - - // Disable broadcast join to force sort merge join. - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") checkAnswer(df, t.result) val queryPlan = df.queryExecution.executedPlan - // confirm that sort merge join is used instead of hash join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.isEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.nonEmpty - ) + // confirm that right kind of join is used. + checkRightTypeOfJoinUsed(queryPlan) - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(queryPlan.toString().contains("collationkey")) - } else { - assert(!queryPlan.toString().contains("collationkey")) + if (isSortMergeForced) { + // Only if collation doesn't support binary equality, collation key should be injected. + if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { + assert(queryPlan.toString().contains("collationkey")) + } else { + assert(!queryPlan.toString().contains("collationkey")) + } + } + else { + // Only if collation doesn't support binary equality, collation key should be injected. + if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { + assert(collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head + }.head.asInstanceOf[ArrayTransform].function.asInstanceOf[LambdaFunction]. + function.isInstanceOf[CollationKey]) + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } } } } - }) + } } test("hash join should be used for arrays of arrays of collated strings " + @@ -1733,75 +1724,54 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { Seq(Row(Seq(Seq("aa")), 1, Seq(Seq("AA ")), 2), Row(Seq(Seq("aa")), 1, Seq(Seq("aa")), 2))) ) - testCases.foreach(t => { + for { + t <- testCases + broadcastJoinThreshold <- Seq(-1, SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + } { withTable(t1, t2) { - sql(s"CREATE TABLE $t1 (x ARRAY>, i int) USING " + - s"PARQUET") - sql(s"INSERT INTO $t1 VALUES (array(array('${t.data1}')), 1)") - - sql(s"CREATE TABLE $t2 (y ARRAY>, j int) USING " + - s"PARQUET") - sql(s"INSERT INTO $t2 VALUES (array(array('${t.data2}')), 2)," + - s" (array(array('${t.data1}')), 2)") - - val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") - checkAnswer(df, t.result) - - val queryPlan = df.queryExecution.executedPlan - - // confirm that hash join is used instead of sort merge join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.nonEmpty - ) - assert( - collectFirst(queryPlan) { - case _: ShuffledJoin => () - }.isEmpty - ) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> broadcastJoinThreshold.toString) { + sql(s"CREATE TABLE $t1 (x ARRAY>, i int) USING " + + s"PARQUET") + sql(s"INSERT INTO $t1 VALUES (array(array('${t.data1}')), 1)") - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(collectFirst(queryPlan) { - case b: BroadcastHashJoinExec => b.leftKeys.head - }.head.asInstanceOf[ArrayTransform].function. - asInstanceOf[LambdaFunction].function.asInstanceOf[ArrayTransform].function. - asInstanceOf[LambdaFunction].function.isInstanceOf[CollationKey]) - } else { - assert(!collectFirst(queryPlan) { - case b: BroadcastHashJoinExec => b.leftKeys.head - }.head.isInstanceOf[ArrayTransform]) - } + sql(s"CREATE TABLE $t2 (y ARRAY>, j int) USING " + + s"PARQUET") + sql(s"INSERT INTO $t2 VALUES (array(array('${t.data2}')), 2)," + + s" (array(array('${t.data1}')), 2)") - // Disable broadcast join to force sort merge join. - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") checkAnswer(df, t.result) val queryPlan = df.queryExecution.executedPlan - // confirm that sort merge join is used instead of hash join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.isEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.nonEmpty - ) + // confirm that right kind of join is used. + checkRightTypeOfJoinUsed(queryPlan) - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(queryPlan.toString().contains("collationkey")) - } else { - assert(!queryPlan.toString().contains("collationkey")) + if (isSortMergeForced) { + // Only if collation doesn't support binary equality, collation key should be injected. + if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { + assert(queryPlan.toString().contains("collationkey")) + } else { + assert(!queryPlan.toString().contains("collationkey")) + } + } + else { + // Only if collation doesn't support binary equality, collation key should be injected. + if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { + assert(collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head + }.head.asInstanceOf[ArrayTransform].function. + asInstanceOf[LambdaFunction].function.asInstanceOf[ArrayTransform].function. + asInstanceOf[LambdaFunction].function.isInstanceOf[CollationKey]) + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } } } } - }) + } } test("hash and sort merge join should respect collation for struct of strings") { @@ -1821,57 +1791,26 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { HashJoinTestCase("UNICODE_CI_RTRIM", "aa", "AA ", Seq(Row(Row("aa"), 1, Row("AA "), 2), Row(Row("aa"), 1, Row("aa"), 2))) ) - testCases.foreach(t => { + for { + t <- testCases + broadcastJoinThreshold <- Seq(-1, SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + } { withTable(t1, t2) { - sql(s"CREATE TABLE $t1 (x STRUCT, i int) USING PARQUET") - sql(s"INSERT INTO $t1 VALUES (named_struct('f', '${t.data1}'), 1)") + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> broadcastJoinThreshold.toString) { + sql(s"CREATE TABLE $t1 (x STRUCT, i int) USING PARQUET") + sql(s"INSERT INTO $t1 VALUES (named_struct('f', '${t.data1}'), 1)") - sql(s"CREATE TABLE $t2 (y STRUCT, j int) USING PARQUET") - sql(s"INSERT INTO $t2 VALUES (named_struct('f', '${t.data2}'), 2)," + - s" (named_struct('f', '${t.data1}'), 2)") + sql(s"CREATE TABLE $t2 (y STRUCT, j int) USING PARQUET") + sql(s"INSERT INTO $t2 VALUES (named_struct('f', '${t.data2}'), 2)," + + s" (named_struct('f', '${t.data1}'), 2)") - val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") - checkAnswer(df, t.result) - - val queryPlan = df.queryExecution.executedPlan - - // Confirm that hash join is used instead of sort merge join. - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.nonEmpty - ) - assert( - collectFirst(queryPlan) { - case _: ShuffledJoin => () - }.isEmpty - ) - - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(queryPlan.toString().contains("collationkey")) - } else { - assert(!queryPlan.toString().contains("collationkey")) - } - - // Disable broadcast join to force sort merge join. - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") checkAnswer(df, t.result) val queryPlan = df.queryExecution.executedPlan - // confirm that sort merge join is used instead of hash join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.isEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.nonEmpty - ) + // confirm that right kind of join is used. + checkRightTypeOfJoinUsed(queryPlan) // Only if collation doesn't support binary equality, collation key should be injected. if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { @@ -1881,7 +1820,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } } - }) + } } test("hash and sort merge join should respect collation " + @@ -1905,60 +1844,30 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { Seq(Row(Row(Seq(Row("aa"))), 1, Row(Seq(Row("AA "))), 2), Row(Row(Seq(Row("aa"))), 1, Row(Seq(Row("aa"))), 2))) ) - testCases.foreach(t => { - withTable(t1, t2) { - sql(s"CREATE TABLE $t1 (x STRUCT>>, " + - s"i int) USING PARQUET") - sql(s"INSERT INTO $t1 VALUES (named_struct('f', array(named_struct('f', '${t.data1}'))), 1)" - ) - - sql(s"CREATE TABLE $t2 (y STRUCT>>, " + - s"j int) USING PARQUET") - sql(s"INSERT INTO $t2 VALUES (named_struct('f', array(named_struct('f', '${t.data2}'))), 2)" - + s", (named_struct('f', array(named_struct('f', '${t.data1}'))), 2)") - val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") - checkAnswer(df, t.result) - - val queryPlan = df.queryExecution.executedPlan - - // confirm that hash join is used instead of sort merge join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.nonEmpty - ) - assert( - collectFirst(queryPlan) { - case _: ShuffledJoin => () - }.isEmpty - ) + for { + t <- testCases + broadcastJoinThreshold <- Seq(-1, SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + } { + withTable(t1, t2) { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> broadcastJoinThreshold.toString) { + sql(s"CREATE TABLE $t1 (x STRUCT>>, " + + s"i int) USING PARQUET") + sql(s"INSERT INTO $t1 VALUES (named_struct('f', array(named_struct('f', " + + s"'${t.data1}'))), 1)") - // Only if collation doesn't support binary equality, collation key should be injected. - if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { - assert(queryPlan.toString().contains("collationkey")) - } else { - assert(!queryPlan.toString().contains("collationkey")) - } + sql(s"CREATE TABLE $t2 (y STRUCT>>, " + + s"j int) USING PARQUET") + sql(s"INSERT INTO $t2 VALUES (named_struct('f', array(named_struct('f', " + + s"'${t.data2}'))), 2), (named_struct('f', array(named_struct('f', '${t.data1}'))), 2)") - // Disable broadcast join to force sort merge join. - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.y") checkAnswer(df, t.result) val queryPlan = df.queryExecution.executedPlan - // confirm that sort merge join is used instead of hash join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.isEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.nonEmpty - ) + // confirm that right kind of join is used. + checkRightTypeOfJoinUsed(queryPlan) // Only if collation doesn't support binary equality, collation key should be injected. if (!CollationFactory.fetchCollation(t.collation).isUtf8BinaryType) { @@ -1968,7 +1877,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } } - }) + } } test("rewrite with collationkey should be an excludable rule") { @@ -2028,51 +1937,27 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { "'a', 'a', 1", "'A', 'A ', 1", Row("a", "a", 1, "A", "A ", 1)) ) - testCases.foreach(t => { + for { + t <- testCases + broadcastJoinThreshold <- Seq(-1, SQLConf.get.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + } { withTable(t1, t2) { - sql(s"CREATE TABLE $t1 (x ${t.type1}, y ${t.type2}, i int) USING PARQUET") - sql(s"INSERT INTO $t1 VALUES (${t.data1})") - sql(s"CREATE TABLE $t2 (x ${t.type1}, y ${t.type2}, i int) USING PARQUET") - sql(s"INSERT INTO $t2 VALUES (${t.data2})") - - val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.x AND $t1.y = $t2.y") - checkAnswer(df, t.result) - - val queryPlan = df.queryExecution.executedPlan + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> broadcastJoinThreshold.toString) { + sql(s"CREATE TABLE $t1 (x ${t.type1}, y ${t.type2}, i int) USING PARQUET") + sql(s"INSERT INTO $t1 VALUES (${t.data1})") + sql(s"CREATE TABLE $t2 (x ${t.type1}, y ${t.type2}, i int) USING PARQUET") + sql(s"INSERT INTO $t2 VALUES (${t.data2})") - // confirm that hash join is used instead of sort merge join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.nonEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.isEmpty - ) - - // Disable broadcast join to force sort merge join. - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.x = $t2.x AND $t1.y = $t2.y") checkAnswer(df, t.result) val queryPlan = df.queryExecution.executedPlan - // confirm that sort merge join is used instead of hash join - assert( - collectFirst(queryPlan) { - case _: HashJoin => () - }.isEmpty - ) - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => () - }.nonEmpty - ) + // confirm that right kind of join is used. + checkRightTypeOfJoinUsed(queryPlan) } } - }) + } } test("hll sketch aggregate should respect collation") {