Skip to content

Commit

Permalink
[Spark] Optimize fileInfo Scan (lakesoul-io#549)
Browse files Browse the repository at this point in the history
* optimize fileInfo Scan

Signed-off-by: zenghua <[email protected]>

* cleanup code

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Oct 11, 2024
1 parent 068f935 commit bb19636
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,25 @@ abstract class MergeDeltaParquetScan(sparkSession: SparkSession,
})
}

val fileInfoSeq = if (isStreaming) newFileIndex.getFileInfoForPartitionVersion() else fileInfo
val dataInfoPath = fileInfoSeq.mkString(",")
val fs = partition.files.head.getPath
.getFileSystem(sparkSession.sessionState.newHadoopConf())
val pathToFileInfoMap = fileInfoSeq.map(f => fs.makeQualified(new Path(f.path)).toString -> f).toMap

partition.files.flatMap { file =>
val filePath = file.getPath
val qualifiedPath = fs.makeQualified(filePath).toString

val touchedFileInfo = pathToFileInfoMap.getOrElse(qualifiedPath, throw LakeSoulErrors.filePathNotFoundException(qualifiedPath, dataInfoPath))

MergePartitionedFileUtil.notSplitFiles(
sparkSession,
file,
filePath,
partitionValues,
tableInfo,
fileInfo = if (isStreaming) newFileIndex.getFileInfoForPartitionVersion() else fileInfo,
touchedFileInfo,
requestFilesSchemaMap,
readDataSchema,
readPartitionSchema.fieldNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object MergePartitionedFileUtil {
filePath: Path,
partitionValues: InternalRow,
tableInfo: TableInfo,
fileInfo: Seq[DataFileInfo],
touchedFileInfo: DataFileInfo,
requestFilesSchemaMap: Map[String, StructType],
requestDataSchema: StructType,
requestPartitionFields: Array[String]): Seq[MergePartitionedFile] = {
Expand All @@ -28,7 +28,7 @@ object MergePartitionedFileUtil {
filePath,
partitionValues,
tableInfo,
fileInfo,
touchedFileInfo,
requestFilesSchemaMap,
requestDataSchema,
requestPartitionFields))
Expand All @@ -39,7 +39,7 @@ object MergePartitionedFileUtil {
filePath: Path,
partitionValues: InternalRow,
tableInfo: TableInfo,
fileInfo: Seq[DataFileInfo],
touchedFileInfo: DataFileInfo,
requestFilesSchemaMap: Map[String, StructType],
requestDataSchema: StructType,
requestPartitionFields: Array[String]): MergePartitionedFile = {
Expand All @@ -49,8 +49,6 @@ object MergePartitionedFileUtil {
.getFileSystem(sparkSession.sessionState.newHadoopConf())
val filePathStr = fs
.makeQualified(filePath).toString
val touchedFileInfo = fileInfo.find(f => filePathStr.equals(fs.makeQualified(new Path(f.path)).toString))
.getOrElse(throw LakeSoulErrors.filePathNotFoundException(filePathStr, fileInfo.mkString(",")))

val touchedFileSchema = requestFilesSchemaMap(touchedFileInfo.range_version).fieldNames

Expand Down

0 comments on commit bb19636

Please sign in to comment.