PrunedFilteredScan
is the contract of BaseRelations with support for column pruning (i.e. eliminating unneeded columns) and filter pushdown (i.e. filtering using selected predicates only).
package org.apache.spark.sql.sources
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
Property | Description |
---|---|
|
Building distributed data scan with column pruning and filter pushdown In other words, Used exclusively when |
Note
|
PrunedFilteredScan is a "lighter" and stable version of the CatalystScan Contract.
|
Note
|
JDBCRelation is the one and only known implementation of the PrunedFilteredScan Contract in Spark SQL. |
// Use :paste to define MyBaseRelation case class
// BEGIN
import org.apache.spark.sql.sources.PrunedFilteredScan
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{StructField, StructType, StringType}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.Filter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
case class MyBaseRelation(sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan {
override def schema: StructType = StructType(StructField("a", StringType) :: Nil)
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
println(s">>> [buildScan] requiredColumns = ${requiredColumns.mkString(",")}")
println(s">>> [buildScan] filters = ${filters.mkString(",")}")
import sqlContext.implicits._
(0 to 4).toDF.rdd
}
}
// END
val scan = MyBaseRelation(spark.sqlContext)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
val plan: LogicalPlan = LogicalRelation(scan)
scala> println(plan.numberedTreeString)
00 Relation[a#1] MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67)
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
val sparkPlan = strategy(plan).head
// >>> [buildScan] requiredColumns = a
// >>> [buildScan] filters =
scala> println(sparkPlan.numberedTreeString)
00 Scan MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67) [a#8] PushedFilters: [], ReadSchema: struct<a:string>