NullPropagation
is a logical optimization (aka Rule[LogicalPlan]
) in Optimizer.
Note
|
NullPropagation is one of the optimizations in the fixed-point Operator Optimizations optimization rule batch in Optimizer .
|
NullPropagation
optimization rewrites Count
aggregate expressions that include expressions that are all nullable to Cast(Literal(0L))
.
val table = (0 to 9).toDF("num").as[Int]
// NullPropagation applied
scala> table.select(countDistinct($"num" === null)).explain(true)
== Parsed Logical Plan ==
'Project [count(distinct ('num = null)) AS count(DISTINCT (num = NULL))#45]
+- Project [value#1 AS num#3]
+- LocalRelation [value#1]
== Analyzed Logical Plan ==
count(DISTINCT (num = NULL)): bigint
Aggregate [count(distinct (num#3 = cast(null as int))) AS count(DISTINCT (num = NULL))#45L]
+- Project [value#1 AS num#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
Aggregate [0 AS count(DISTINCT (num = NULL))#45L] // <-- HERE
+- LocalRelation
== Physical Plan ==
*HashAggregate(keys=[], functions=[], output=[count(DISTINCT (num = NULL))#45L])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[], output=[])
+- LocalTableScan
NullPropagation
optimization rewrites any non-nullable
non-distinct Count
aggregate expressions to Literal(1)
.
val table = (0 to 9).toDF("num").as[Int]
// NullPropagation applied
// current_timestamp() is a non-nullable expression (see the note below)
val query = table.select(count(current_timestamp()) as "count")
scala> println(query.queryExecution.optimizedPlan)
Aggregate [count(1) AS count#64L]
+- LocalRelation
// NullPropagation skipped
val tokens = Seq((0, null), (1, "hello")).toDF("id", "word")
val query = tokens.select(count("word") as "count")
scala> println(query.queryExecution.optimizedPlan)
Aggregate [count(word#55) AS count#71L]
+- LocalRelation [word#55]
Note
|
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.functions.count
scala> count("*").expr.children(0).asInstanceOf[Count]
res0: org.apache.spark.sql.catalyst.expressions.aggregate.Count = count(1) |
Note
|
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp
import org.apache.spark.sql.functions.current_timestamp
scala> current_timestamp().expr.asInstanceOf[CurrentTimestamp].nullable
res38: Boolean = false |
val table = (0 to 9).toDF("num").as[Int]
val query = table.where('num === null)
scala> query.explain(extended = true)
== Parsed Logical Plan ==
'Filter ('num = null)
+- Project [value#1 AS num#3]
+- LocalRelation [value#1]
== Analyzed Logical Plan ==
num: int
Filter (num#3 = cast(null as int))
+- Project [value#1 AS num#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
LocalRelation <empty>, [num#3]
== Physical Plan ==
LocalTableScan <empty>, [num#3]