Skip to content

Latest commit

 

History

History
129 lines (90 loc) · 6.8 KB

spark-sql-SparkStrategy-Aggregation.adoc

File metadata and controls

129 lines (90 loc) · 6.8 KB

Aggregation Execution Planning Strategy for Aggregate Physical Operators

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// structured query with count aggregate function
val q = spark.range(5).
  groupBy($"id" % 2 as "group").
  agg(count("id") as "count")
import q.queryExecution.optimizedPlan
scala> println(optimizedPlan.numberedTreeString)
00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L]
01 +- Range (0, 5, step=1, splits=Some(8))

import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(optimizedPlan)

// HashAggregateExec selected
scala> println(physicalPlan.head.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L])
02    +- PlanLater Range (0, 5, step=1, splits=Some(8))

Aggregation can select the following aggregate physical operators (in order of preference):

AggUtils.planAggregateWithOneDistinct Method

Caution
FIXME

Applying Aggregation Strategy to Logical Plan (Executing Aggregation) — apply Method

apply(plan: LogicalPlan): Seq[SparkPlan]
Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply requests PhysicalAggregation extractor for Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator found.

Internally, apply requests PhysicalAggregation to destructure a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).

apply then creates a physical operator using the following helper methods:

Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.createAggregate Internal Method

createAggregate(
  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

Internally, createAggregate selects and creates a physical operator given the input aggregateExpressions aggregate expressions.

Table 1. createAggregate’s Aggregate Physical Operator Selection Criteria (in execution order)
Aggregate Physical Operator Selection Criteria

HashAggregateExec

HashAggregateExec supports all aggBufferAttributes of the input aggregateExpressions aggregate expressions.

ObjectHashAggregateExec

  1. spark.sql.execution.useObjectHashAggregateExec internal flag enabled (it is by default)

  2. ObjectHashAggregateExec supports the input aggregateExpressions aggregate expressions.

SortAggregateExec

When all the above requirements could not be met.

Note

createAggregate is used in:

Creating Physical Plan with Two Aggregate Physical Operators for Partial and Final Aggregations — AggUtils.planAggregateWithoutDistinct Method

planAggregateWithoutDistinct(
  groupingExpressions: Seq[NamedExpression],
  aggregateExpressions: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  child: SparkPlan): Seq[SparkPlan]

planAggregateWithoutDistinct is a two-step physical operator generator.

planAggregateWithoutDistinct first creates an aggregate physical operator with aggregateExpressions in Partial mode (for partial aggregations).

Note
requiredChildDistributionExpressions for the aggregate physical operator for partial aggregation "stage" is empty.

In the end, planAggregateWithoutDistinct creates another aggregate physical operator (of the same type as before), but aggregateExpressions are now in Final mode (for final aggregations). The aggregate physical operator becomes the parent of the first aggregate operator.

Note
requiredChildDistributionExpressions for the parent aggregate physical operator for final aggregation "stage" are the attributes of groupingExpressions.
Note
planAggregateWithoutDistinct is used exclusively when Aggregation execution planning strategy is executed (with no AggregateExpressions being distinct).