SparkPlan
is the contract in Spark SQL for physical operators to build a physical query plan (aka query execution plan).
SparkPlan
is a recursive data structure in Spark SQL’s Catalyst tree manipulation framework and as such represents a single physical operator in a physical execution query plan as well as a physical execution query plan itself (i.e. a tree of physical operators in a query plan of a structured query).
Note
|
A structured query can be expressed using Spark SQL’s high-level Dataset API for Scala, Java, Python, R or good ol' SQL. |
A SparkPlan
physical operator is a Catalyst tree node that may have zero or more child physical operators.
Note
|
A structured query is basically a single SparkPlan physical operator with child physical operators.
|
Note
|
Spark SQL uses Catalyst tree manipulation framework to compose nodes to build a tree of (logical or physical) operators that, in this particular case, is composing SparkPlan physical operator nodes to build the physical execution plan tree of a structured query.
|
The entry point to Physical Operator Execution Pipeline is execute.
When executed, SparkPlan
executes the internal query implementation in a named scope (for visualization purposes, e.g. web UI) that triggers prepare of the children physical operators first followed by prepareSubqueries and finally doPrepare methods. After subqueries have finished, doExecute method is eventually triggered.
The result of executing a SparkPlan
is an RDD
of internal binary rows, i.e. RDD[InternalRow]
.
Note
|
Executing a structured query is simply a translation of the higher-level Dataset-based description to an RDD-based runtime representation that Spark will in the end execute (once an Dataset action is used). |
Caution
|
FIXME Picture between Spark SQL’s Dataset ⇒ Spark Core’s RDD |
SparkPlan
has access to the owning SparkContext
.
Note
|
execute is called when The could part above refers to the fact that the final execution of a structured query happens only when a RDD action is executed on the RDD of a structured query. And hence the need for Spark SQL’s high-level Dataset API in which the Dataset operators simply execute a RDD action on the corresponding RDD. Easy, isn’t it? |
Tip
|
Use explain operator to see the execution plan of a structured query. val q = // your query here
q.explain You may also access the execution plan of a val q = // your query here
q.queryExecution.sparkPlan |
The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which is executed when the physical operator is executed.
SparkPlan
has the following final
methods that prepare execution environment and pass calls to corresponding methods (that constitute SparkPlan Contract).
Name | Description | ||
---|---|---|---|
"Executes" a physical operator (and its children) that triggers physical query planning and in the end generates an final def execute(): RDD[InternalRow] Used mostly when Internally,
|
|||
Executes a physical operator in a single RDD scope, i.e. all RDDs created during execution of the physical operator have the same scope. protected final def executeQuery[T](query: => T): T
|
|||
Prepares a physical operator for execution final def prepare(): Unit
Internally, |
|||
Calls doExecuteBroadcast |
Name | Description |
---|---|
Binary physical operator with two child |
|
Leaf physical operator with no children By default, the set of all attributes that are produced is exactly the set of attributes that are output. |
|
|
Note
|
The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.
|
Name | Description |
---|---|
Flag that controls that prepare is executed only once. |
|
Flag that controls whether the subexpression elimination optimization is enabled or not. Used when the following physical operators are requested to execute (i.e. describe a distributed computation as an RDD of internal rows): |
Caution
|
FIXME SparkPlan is Serializable . Why? Is this because Dataset.cache persists executed query plans?
|
Compressing Partitions of UnsafeRows (to Byte Arrays) After Executing Physical Operator — getByteArrayRdd
Internal Method
getByteArrayRdd(n: Int = -1): RDD[Array[Byte]]
Caution
|
FIXME |
resetMetrics(): Unit
Note
|
resetMetrics is used when…FIXME
|
executeCollectIterator(): (Long, Iterator[InternalRow])
executeCollectIterator
…FIXME
Note
|
executeCollectIterator is used when…FIXME
|
SparkPlan
contract requires that concrete physical operators implement doExecute.
doExecute(): RDD[InternalRow]
doExecute
allows a physical operator to describe a distributed computation (that is a runtime representation of the operator in particular and a structured query in general) as an RDD of internal binary rows, i.e. RDD[InternalRow]
, and thus execute.
Name | Description |
---|---|
By default reports a
Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable. |
|
Prepares a physical operator for execution Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast a relation asynchronously or SubqueryExec to execute a child operator) |
|
Specifies output data ordering, i.e. how data is sorted (ordered) in each partition [source, scala] ---- outputOrdering: Seq[SortOrder] = Nil ----
Used exclusively when |
|
Specifies output data partitioning, i.e. how output data is partitioned (across executors) |
|
Specifies the required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators' output is split across partitions. requiredChildDistribution: Seq[Distribution] Defaults to a UnspecifiedDistribution for all of the child operators. Used exclusively when |
|
Specifies required sort ordering for each partition requirement (from children operators) requiredChildOrdering: Seq[Seq[SortOrder]] Defaults to no sort ordering for all of the physical operator’s children. Used exclusively when |
executeQuery[T](query: => T): T
executeQuery
executes the input query
in a named scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).
Internally, executeQuery
calls prepare and waitForSubqueries followed by executing query
.
Note
|
executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport -enabled physical operator produces a Java source code.
|
executeBroadcast[T](): broadcast.Broadcast[T]
executeBroadcast
returns the result of a structured query as a broadcast variable.
Internally, executeBroadcast
calls doExecuteBroadcast inside executeQuery.
Note
|
executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.
|
metrics: Map[String, SQLMetric] = Map.empty
metrics
is a registry of supported SQLMetrics by their names.
executeTake(n: Int): Array[InternalRow]
executeTake
gives an array of up to n
first internal rows.
Internally, executeTake
gets an RDD of byte array of n
unsafe rows and scans the RDD partitions one by one until n
is reached or all partitions were processed.
executeTake
runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).
Note
|
executeTake uses SparkContext.runJob to run a Spark job.
|
In the end, executeTake
decodes the unsafe rows.
Note
|
executeTake gives an empty collection when n is 0 (and no Spark job is executed).
|
Note
|
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).
|
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 10)
// 8 groups over 10 partitions
// only 7 partitions are with numbers
val nums = spark.
range(start = 0, end = 20, step = 1, numPartitions = 4).
repartition($"id" % 8)
import scala.collection.Iterator
val showElements = (it: Iterator[java.lang.Long]) => {
val ns = it.toSeq
import org.apache.spark.TaskContext
val pid = TaskContext.get.partitionId
println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
}
// ordered by partition id manually for demo purposes
scala> nums.foreachPartition(showElements)
[partition: 0][size: 2] 4 12
[partition: 1][size: 2] 7 15
[partition: 2][size: 0]
[partition: 3][size: 0]
[partition: 4][size: 0]
[partition: 5][size: 5] 0 6 8 14 16
[partition: 6][size: 0]
[partition: 7][size: 3] 3 11 19
[partition: 8][size: 5] 2 5 10 13 18
[partition: 9][size: 3] 1 9 17
scala> println(spark.sessionState.conf.limitScaleUpFactor)
4
// Think how many Spark jobs will the following queries run?
// Answers follow
scala> nums.take(13)
res0: Array[Long] = Array(4, 12, 7, 15, 0, 6, 8, 14, 16, 3, 11, 19, 2)
// The number of Spark jobs = 3
scala> nums.take(5)
res34: Array[Long] = Array(4, 12, 7, 15, 0)
// The number of Spark jobs = 4
scala> nums.take(3)
res38: Array[Long] = Array(4, 12, 7)
// The number of Spark jobs = 2
Note
|
|
executeCollect(): Array[InternalRow]
executeCollect
executes the physical operator and compresses partitions of UnsafeRows as byte arrays (that yields a RDD[(Long, Array[Byte])]
and so no real Spark jobs may have been submitted).
executeCollect
runs a Spark job to collect
the elements of the RDD and for every pair in the result (of a count and bytes per partition) decodes the byte arrays back to UnsafeRows and stores the decoded arrays together as the final Array[InternalRow]
.
Note
|
executeCollect runs a Spark job using Spark Core’s RDD.collect operator.
|
Note
|
executeCollect returns Array[InternalRow] , i.e. keeps the internal representation of rows unchanged and does not convert rows to JVM types.
|
Note
|
|
executeCollectPublic(): Array[Row]
executeCollectPublic
…FIXME
Note
|
executeCollectPublic is used when…FIXME
|
newPredicate(expression: Expression, inputSchema: Seq[Attribute]): GenPredicate
newPredicate
…FIXME
Note
|
newPredicate is used when…FIXME
|
waitForSubqueries(): Unit
waitForSubqueries
requests every ExecSubqueryExpression in runningSubqueries to updateResult.
Note
|
waitForSubqueries is used exclusively when a physical operator is requested to prepare itself for query execution (when it is executed or requested to executeBroadcast).
|