is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec
and ProjectExec
Logical Operator | Description |
LogicalRelation with a CatalystScan relation |
LogicalRelation with PrunedFilteredScan relation |
Uses pruneFilterProject Matches JDBCRelation exclusively |
LogicalRelation with a PrunedScan relation |
Uses pruneFilterProject
LogicalRelation with a TableScan relation |
Creates a RowDataSourceScanExec directly (requesting the Matches KafkaRelation exclusively |
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???
val sparkPlan = strategy(plan).head
DataSourceStrategy uses PhysicalOperation Scala extractor object to destructure a logical query plan.
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow])
simply calls pruneFilterProjectRaw with scanBuilder
ignoring the Seq[Expression]
input parameter.
pruneFilterProject is used when DataSourceStrategy execution planning strategy is executed (for LogicalRelation logical operators with a PrunedFilteredScan or a PrunedScan).
Selecting Catalyst Expressions Convertible to Data Source Filter Predicates (and Handled by BaseRelation) — selectFilters
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])
builds a map of Catalyst predicate expressions (from the input predicates
) that can be translated to a data source filter predicate.
then requests the input BaseRelation
for unhandled filters (out of the convertible ones that selectFilters
built the map with).
In the end, selectFilters
returns a 3-element tuple with the following:
Inconvertible and unhandled Catalyst predicate expressions
All converted data source filters
Pushed-down data source filters (that the input
can handle)
selectFilters is used exclusively when DataSourceStrategy execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) (which is when DataSourceStrategy is executed and pruneFilterProject).
translateFilter(predicate: Expression): Option[Filter]
translates a Catalyst expression into a corresponding Filter predicate if possible. If not, translateFilter
returns None
Catalyst Expression | Filter Predicate |
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources , respectively.
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]) // (1)
Calls the former
with the output of theLogicalRelation
branches off per the needConversion flag of the BaseRelation of the input LogicalRelation.
When enabled (true
), toCatalystRDD
converts the objects inside Rows to Catalyst types.
Otherwise, toCatalystRDD
simply casts the input RDD[Row]
to a RDD[InternalRow]
(as a simple untyped Scala type conversion).
needConversion flag is enabled (true ) by default.
toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed.
Creating RowDataSourceScanExec Physical Operator for LogicalRelation (Possibly Under FilterExec and ProjectExec Operators) — pruneFilterProjectRaw
Internal Method
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan
creates a RowDataSourceScanExec leaf physical operator given a LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).
In other words, pruneFilterProjectRaw
simply converts a LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator (possibly under a FilterExec and a ProjectExec unary physical operators).
pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.
Internally, pruneFilterProjectRaw
splits the input filterPredicates
expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the BaseRelation of the LogicalRelation
combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And
binary expression (that creates a so-called filterCondition
that will eventually be used to create a FilterExec physical operator if non-empty).
creates a RowDataSourceScanExec leaf physical operator.
If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw
creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec
leaf physical operator as the child).
In this case no extra ProjectExec unary physical operator is created. |
Otherwise, pruneFilterProjectRaw
creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec
leaf physical operator as the child) that in turn becomes the child of a new ProjectExec unary physical operator.
pruneFilterProjectRaw is used exclusively when DataSourceStrategy execution planning strategy is executed (for a LogicalRelation with a CatalystScan relation) and pruneFilterProject (when executed for a LogicalRelation with a PrunedFilteredScan or a PrunedScan relation).