is a logical optimization that the Spark Optimizer uses for join reordering in cost-based optimization.
Technically, ReorderJoin
is just a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
applies the join optimizations on a logical plan with 2 or more consecutive inner or cross joins (possibly separated by Project
operators) when spark.sql.cbo.enabled and spark.sql.cbo.joinReorder.enabled configuration properties are both enabled.
// Use shortcuts to read the values of the properties
scala> spark.sessionState.conf.cboEnabled
res0: Boolean = true
scala> spark.sessionState.conf.joinReorderEnabled
res1: Boolean = true
uses row count statistic that is computed using ANALYZE TABLE COMPUTE STATISTICS SQL command with no NOSCAN
scala> spark.version
res0: String = 2.3.0
// Create tables and compute their row count statistics
// There have to be at least 2 joins
// Make the example reproducible
val tableNames = Seq("t1", "t2", "tiny")
import org.apache.spark.sql.catalyst.TableIdentifier
val tableIds =
val sessionCatalog = spark.sessionState.catalog
tableIds.foreach { tableId =>
sessionCatalog.dropTable(tableId, ignoreIfNotExists = true, purge = true)
val belowBroadcastJoinThreshold = spark.sessionState.conf.autoBroadcastJoinThreshold - 1
// t2 is twice as big as t1
spark.range(2 * belowBroadcastJoinThreshold).write.saveAsTable("t2")
// Compute row count statistics
tableNames.foreach { t =>
// Load the tables
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val tiny = spark.table("tiny")
// Example: Inner join with join condition
val q = t1.join(t2, Seq("id")).join(tiny, Seq("id"))
val plan = q.queryExecution.analyzed
scala> println(plan.numberedTreeString)
00 Project [id#51L]
01 +- Join Inner, (id#51L = id#57L)
02 :- Project [id#51L]
03 : +- Join Inner, (id#51L = id#54L)
04 : :- SubqueryAlias t1
05 : : +- Relation[id#51L] parquet
06 : +- SubqueryAlias t2
07 : +- Relation[id#54L] parquet
08 +- SubqueryAlias tiny
09 +- Relation[id#57L] parquet
// Eliminate SubqueryAlias logical operators as they no longer needed
// And "confuse" CostBasedJoinReorder
// CostBasedJoinReorder cares about how deep Joins are and reorders consecutive joins only
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
val noAliasesPlan = EliminateSubqueryAliases(plan)
scala> println(noAliasesPlan.numberedTreeString)
00 Project [id#51L]
01 +- Join Inner, (id#51L = id#57L)
02 :- Project [id#51L]
03 : +- Join Inner, (id#51L = id#54L)
04 : :- Relation[id#51L] parquet
05 : +- Relation[id#54L] parquet
06 +- Relation[id#57L] parquet
// Let's go pro and create a custom RuleExecutor (i.e. an Optimizer)
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) ::
Batch("Join Reorder", Once, CostBasedJoinReorder) :: Nil
val joinsReordered = Optimize.execute(plan)
scala> println(joinsReordered.numberedTreeString)
00 Project [id#51L]
01 +- Join Inner, (id#51L = id#54L)
02 :- Project [id#51L]
03 : +- Join Inner, (id#51L = id#57L)
04 : :- Relation[id#51L] parquet
05 : +- Relation[id#57L] parquet
06 +- Relation[id#54L] parquet
// Execute the plans
// Compare the plans as diagrams in web UI @ http://localhost:4040/SQL
// We'd have to use too many internals so let's turn CBO on and off
// Moreover, please remember that the query "phases" are cached
// That's why we copy and paste the entire query for execution
import org.apache.spark.sql.internal.SQLConf
val cc = SQLConf.get
cc.setConf(SQLConf.CBO_ENABLED, false)
val q = t1.join(t2, Seq("id")).join(tiny, Seq("id"))
q.collect.foreach(_ => ())
cc.setConf(SQLConf.CBO_ENABLED, true)
val q = t1.join(t2, Seq("id")).join(tiny, Seq("id"))
q.collect.foreach(_ => ())
FIXME Examples of other join queries
is part of Join Reorder once-executed batch of rules.
Enable Add the following line to
Refer to Logging. |
apply(plan: LogicalPlan): LogicalPlan
apply is part of Rule Contract to apply a rule to a logical plan (aka execute a rule).
traverses the input logical plan down and tries to reorder the following logical operators:
reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan
reorder is used exclusively when CostBasedJoinReorder is applied to a logical plan.
replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan
replaceWithOrderedJoin is used recursively and when CostBasedJoinReorder is reordering…FIXME
extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression])
finds consecutive Join logical operators (inner or cross) with join conditions or Project logical operators with Join
logical operator and the project list of Attribute leaf expressions only.
For Project
operators extractInnerJoins
calls itself recursively with the Join
operator inside.
In the end, extractInnerJoins
gives the collection of logical plans under the consecutive Join
logical operators (possibly separated by Project
operators only) and their join conditions (for which And
expressions have been split).
extractInnerJoins is used recursively when CostBasedJoinReorder is reordering a logical plan.