Skip to content

Latest commit

 

History

History
87 lines (65 loc) · 3.94 KB

spark-sql-TungstenAggregationIterator.adoc

File metadata and controls

87 lines (65 loc) · 3.94 KB

TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator

TungstenAggregationIterator is a custom AggregationIterator that is created when HashAggregateExec aggregate physical operator is executed (to process rows per partition).

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
  }
}
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?

processInputs Internal Method

processInputs(fallbackStartsAt: (Int, Int)): Unit
Caution
FIXME
Note
processInputs is used when TungstenAggregationIterator is created (and sets the internal flags to indicate whether to use a hash-based aggregation or, in the worst case, a sort-based aggregation when there is not enough memory for groups and their buffers).

switchToSortBasedAggregation Internal Method

switchToSortBasedAggregation(): Unit
Caution
FIXME
Note
switchToSortBasedAggregation is used when TungstenAggregationIterator processInputs (and externalSorter is used).

next Method

Caution
FIXME

hasNext Method

Caution
FIXME

Creating TungstenAggregationIterator Instance

TungstenAggregationIterator takes the following when created:

TungstenAggregationIterator initializes the internal registries and counters.