FileSourceScanExec
is a DataSourceScanExec (and so indirectly a leaf physical operator) that represents a scan over collections of files (including Hive tables).
FileSourceScanExec
is created exclusively for a LogicalRelation logical operator with a HadoopFsRelation when FileSourceStrategy
execution planning strategy is executed (i.e. applied to a logical plan).
// Create a bucketed data source table
// It is one of the most complex examples of a LogicalRelation with a HadoopFsRelation
val tableName = "bucketed_4_id"
spark
.range(100)
.write
.bucketBy(4, "id")
.sortBy("id")
.mode("overwrite")
.saveAsTable(tableName)
val q = spark.table(tableName)
val sparkPlan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec
FileSourceScanExec
uses a HashPartitioning
or an UnknownPartitioning
output partitioning scheme.
FileSourceScanExec
is a ColumnarBatchScan and supports batch decoding only when the FileFormat (of the HadoopFsRelation) supports it.
FileSourceScanExec
always gives the single inputRDD as the only RDD of internal rows (when WholeStageCodegenExec
is executed).
FileSourceScanExec
supports data source filters that are printed out to the console (at INFO logging level) and available as metadata (e.g. in web UI or explain).
Pushed Filters: [pushedDownFilters]
Key | Name (in web UI) | Description |
---|---|---|
metadata time (ms) |
||
number of files |
As a DataSourceScanExec, FileSourceScanExec
uses Scan for the prefix of the node name.
val fileScanExec: FileSourceScanExec = ... // see the example earlier
scala> println(fileScanExec.nodeName)
Scan csv
FileSourceScanExec
uses File for nodeNamePrefix (that is used for the simple node description in query plans).
val fileScanExec: FileSourceScanExec = ... // see the example earlier
scala> println(fileScanExec.nodeNamePrefix)
File
scala> println(fileScanExec.simpleString)
FileScan csv [id#20,name#21,city#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/datasets/people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string,city:string>
Name | Description | ||
---|---|---|---|
Metadata (as a collection of key-value pairs)
|
|||
Data source filters that are dataFilters expressions converted to their respective filters
|
Tip
|
Enable Add the following line to
Refer to Logging. |
createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow]
createNonBucketedReadRDD
…FIXME
Note
|
createNonBucketedReadRDD is used when…FIXME
|
selectedPartitions: Seq[PartitionDirectory]
selectedPartitions
…FIXME
Note
|
|
FileSourceScanExec
takes the following when created:
-
Output schema attributes
-
partitionFilters
Catalyst expressions -
dataFilters
Catalyst expressions
FileSourceScanExec
initializes the internal registries and counters.
outputPartitioning: Partitioning
Note
|
outputPartitioning is part of SparkPlan Contract to specify data partitioning.
|
outputPartitioning
may give a HashPartitioning output partitioning when bucketing is enabled.
Caution
|
FIXME |
createBucketedReadRDD(
bucketSpec: BucketSpec,
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow]
createBucketedReadRDD
prints the following INFO message to the logs:
Planning with [numBuckets] buckets
createBucketedReadRDD
maps the available files of the input selectedPartitions
into PartitionedFiles. For every file, createBucketedReadRDD
getBlockLocations and getBlockHosts.
createBucketedReadRDD
then groups the PartitionedFiles
by bucket ID.
Note
|
Bucket ID is of the format _0000n, i.e. the bucket ID prefixed with up to four 0 s.
|
createBucketedReadRDD
creates a FilePartition
for every bucket ID and the PartitionedFiles
as grouped earlier.
In the end, createBucketedReadRDD
creates a FileScanRDD (with the input readFile
for the read function and the FilePartitions
for every bucket ID for partitions)
Tip
|
Use // Create a bucketed table
spark.range(8).write.bucketBy(4, "id").saveAsTable("b1")
scala> sql("desc extended b1").where($"col_name" like "%Bucket%").show
+--------------+---------+-------+
| col_name|data_type|comment|
+--------------+---------+-------+
| Num Buckets| 4| |
|Bucket Columns| [`id`]| |
+--------------+---------+-------+
val bucketedTable = spark.table("b1")
val lineage = bucketedTable.queryExecution.toRdd.toDebugString
scala> println(lineage)
(4) MapPartitionsRDD[26] at toRdd at <console>:26 []
| FileScanRDD[25] at toRdd at <console>:26 [] |
Note
|
createBucketedReadRDD is used exclusively when FileSourceScanExec is requested for inputRDD (for the very first time after which the result is cached).
|
supportsBatch: Boolean
Note
|
supportsBatch is part of ColumnarBatchScan Contract to enable vectorized decoding.
|
supportsBatch
is enabled (i.e. true
) only when the FileFormat (of the HadoopFsRelation) supports vectorized decoding.
Otherwise, supportsBatch
is disabled (i.e. false
).
FileSourceScanExec
is a ColumnarBatchScan and supports batch decoding only when the FileFormat (of the HadoopFsRelation) supports it.
FileSourceScanExec
has needsUnsafeRowConversion flag enabled for ParquetFileFormat
data sources exclusively.
FileSourceScanExec
has vectorTypes…FIXME
needsUnsafeRowConversion: Boolean
Note
|
needsUnsafeRowConversion is part of ColumnarBatchScan Contract to control the name of the variable for an input row while generating the Java source code to consume generated columns or row from a physical operator.
|
needsUnsafeRowConversion
is enabled (i.e. true
) when the following conditions all hold:
-
FileFormat of the HadoopFsRelation is ParquetFileFormat
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled (default:
true
)
Otherwise, needsUnsafeRowConversion
is disabled (i.e. false
).
Note
|
needsUnsafeRowConversion is used when FileSourceScanExec is executed (and supportsBatch flag is off).
|
vectorTypes: Option[Seq[String]]
Note
|
vectorTypes is part of ColumnarBatchScan Contract to..FIXME.
|
vectorTypes
simply requests the FileFormat of the HadoopFsRelation for vectorTypes.
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
branches off per supportsBatch flag.
If supportsBatch is on, doExecute
creates a WholeStageCodegenExec (with codegenStageId as 0
) and executes it right after.
If supportsBatch is off, doExecute
creates an unsafeRows
RDD to scan over which is different per needsUnsafeRowConversion flag.
If needsUnsafeRowConversion flag is on, doExecute
takes the inputRDD and creates a new RDD by applying a function to each partition (using RDD.mapPartitionsWithIndexInternal
):
-
Creates a UnsafeProjection for the schema
-
Initializes the UnsafeProjection
-
Maps over the rows in a partition iterator using the
UnsafeProjection
projection
Otherwise, doExecute
simply takes the inputRDD as the unsafeRows
RDD (with no changes).
doExecute
takes the numOutputRows metric and creates a new RDD by mapping every element in the unsafeRows
and incrementing the numOutputRows
metric.
Tip
|
Use With supportsBatch off and needsUnsafeRowConversion on you should see two more RDDs in the RDD lineage. |
inputRDD: RDD[InternalRow]
Note
|
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.
|
inputRDD
is an input RDD
of internal binary rows (i.e. InternalRow
) that is used when FileSourceScanExec
physical operator is requested for inputRDDs and execution.
When created, inputRDD
requests HadoopFsRelation to get the underlying FileFormat that is in turn requested to build a data reader with partition column values appended (with the input parameters from the properties of HadoopFsRelation and pushedDownFilters).
In case HadoopFsRelation has bucketing specification defined and bucketing support is enabled, inputRDD
creates a FileScanRDD with bucketing (with the bucketing specification, the reader, selectedPartitions and the HadoopFsRelation itself). Otherwise, inputRDD
createNonBucketedReadRDD.
Note
|
createBucketedReadRDD accepts a bucketing specification while createNonBucketedReadRDD does not. |