Skip to content

Commit

Permalink
[NativeIO]Add UseLastRangeCombiner for default usage of merge on read (
Browse files Browse the repository at this point in the history
…#578)

* Add UseLastRangeCombiner for default usage of merge on read

Signed-off-by: zenghua <[email protected]>

* disable dictionary encoding

Signed-off-by: chenxu <[email protected]>

* optimize hash in build_record_batch

Signed-off-by: chenxu <[email protected]>

* optimize combiner

Signed-off-by: zenghua <[email protected]>

* optimize bound checking in add range

Signed-off-by: chenxu <[email protected]>

* replace heap with loser tree

Signed-off-by: zenghua <[email protected]>

* disable DeltaJoinSuite

Signed-off-by: zenghua <[email protected]>

* fix loser tree impl

Signed-off-by: zenghua <[email protected]>

* fix cargo clippy

Signed-off-by: zenghua <[email protected]>

* local merge test

Signed-off-by: chenxu <[email protected]>

* optimize case for non-partial merge

Signed-off-by: chenxu <[email protected]>

* compaction with file size condition in parallel

Signed-off-by: zenghua <[email protected]>

* optimize build_record_batch for full merge

Signed-off-by: chenxu <[email protected]>

* fix spark suite

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Signed-off-by: chenxu <[email protected]>
Co-authored-by: zenghua <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
3 people authored Feb 6, 2025
1 parent 7462f51 commit 4d968b8
Show file tree
Hide file tree
Showing 25 changed files with 1,204 additions and 267 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ jobs:
path: ./rust/target/release/
- name: Build with Maven
run: |
mvn -B test -pl lakesoul-spark -am -Pcross-build -Pparallel-test --file pom.xml -Dtest='!UpdateScalaSuite,!AlterTableByNameSuite,!ReadSuite,!UpdateSQLSuite,!ParquetNativeFilterSuite,!DeleteScalaSuite,!DeleteSQLSuite,!ParquetV2FilterSuite,!ParquetScanSuite,!UpsertSuiteBase,!RBACOperationSuite' -Dsurefire.failIfNoSpecifiedTests=false
mvn -B test -pl lakesoul-spark -am -Pcross-build -Pparallel-test --file pom.xml -Dtest='!UpdateScalaSuite,!AlterTableByNameSuite,!ReadSuite,!UpdateSQLSuite,!ParquetNativeFilterSuite,!DeleteScalaSuite,!DeleteSQLSuite,!ParquetV2FilterSuite,!ParquetScanSuite,!UpsertSuiteBase,!RBACOperationSuite,!DeltaJoinSuite' -Dsurefire.failIfNoSpecifiedTests=false
- name: Generate Report Site
if: always()
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,23 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.vectorized.ArrowFakeRowAdaptor
import org.apache.spark.util.{SerializableConfiguration, Utils}
import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_NON_PARTITION_TABLE_PART_DESC, LAKESOUL_RANGE_PARTITION_SPLITTER}
import com.dmetasoul.lakesoul.meta.DBUtil
import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath
import org.apache.spark.sql.lakesoul.DelayedCopyCommitProtocol
import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetCompactionColumnarOutputWriter, NativeParquetOutputWriter}
import org.apache.spark.sql.lakesoul.{DelayedCommitProtocol, DelayedCopyCommitProtocol}
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}

import java.util.{Date, UUID}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`

/** A helper object for writing FileFormat data out to a location. */
object LakeSoulFileWriter extends Logging {
val MAX_FILE_SIZE_KEY = "max_file_size"
val HASH_BUCKET_ID_KEY = "hash_bucket_id"
val SNAPPY_COMPRESS_RATIO = 3
val COPY_FILE_WRITER_KEY = "copy_file_writer"

/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data source specific
Expand Down Expand Up @@ -178,7 +187,11 @@ object LakeSoulFileWriter extends Logging {
val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE)

def nativeWrap(plan: SparkPlan): RDD[InternalRow] = {
if (isCompaction && !isCDC && !isBucketNumChanged && nativeIOEnable) {
if (isCompaction
&& staticBucketId != -1
&& !isCDC
&& !isBucketNumChanged
&& nativeIOEnable) {
plan match {
case withPartitionAndOrdering(_, _, child) =>
return nativeWrap(child)
Expand All @@ -203,8 +216,8 @@ object LakeSoulFileWriter extends Logging {
try {
// for compaction, we won't break ordering from batch scan
val (rdd, concurrentOutputWriterSpec) =
if (isCompaction && options.getOrElse("copyCompactedFile", "").nonEmpty) {
val data = Seq(InternalRow(options("copyCompactedFile")))
if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "false").toBoolean) {
val data = Seq(InternalRow(COPY_FILE_WRITER_KEY))
(sparkSession.sparkContext.parallelize(data), None)
} else if (!isBucketNumChanged && (orderingMatched || isCompaction)) {
(nativeWrap(empty2NullPlan), None)
Expand Down Expand Up @@ -410,6 +423,7 @@ object LakeSoulFileWriter extends Logging {
private var recordsInFile: Long = _
private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
.map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/"))
private val maxFileSize = options.get(MAX_FILE_SIZE_KEY)

/** Given an input row, returns the corresponding `bucketId` */
protected lazy val getBucketId: InternalRow => Int = {
Expand All @@ -419,26 +433,56 @@ object LakeSoulFileWriter extends Logging {
row => proj(row).getInt(0)
}

override protected def releaseCurrentWriter(): Unit = {
if (currentWriter != null) {
try {
currentWriter.close()
if (maxFileSize.isDefined) {
currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => {
val (partitionDesc, flushResult) = result
val partitionDescList = if (partitionDesc == "-4") {
DBUtil.parsePartitionDesc(options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)).asScala.toList
} else {
DBUtil.parsePartitionDesc(partitionDesc).asScala.toList
}
committer.asInstanceOf[DelayedCommitProtocol].addOutputFile(partitionDescList, flushResult.map(_.getFilePath).toList)
})
}
statsTrackers.foreach(_.closeFile(currentWriter.path()))
} finally {
currentWriter = null
}
}
}

private def newOutputWriter(record: InternalRow): Unit = {
recordsInFile = 0
releaseResources()

val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val suffix = if (bucketSpec.isDefined) {
val bucketIdStr = if (partitionId == -1) {
BucketingUtils.bucketIdToString(getBucketId(record))
val bucketId = if (partitionId == -1) {
getBucketId(record)
} else {
BucketingUtils.bucketIdToString(partitionId)
partitionId
}
taskAttemptContext.getConfiguration.set(HASH_BUCKET_ID_KEY, bucketId.toString)

val bucketIdStr = BucketingUtils.bucketIdToString(bucketId)
f"$bucketIdStr.c$fileCounter%03d" + ext
} else {
f"-c$fileCounter%03d" + ext
}

if (maxFileSize.isDefined) {
taskAttemptContext.getConfiguration.set(MAX_FILE_SIZE_KEY, maxFileSize.get)
}

val currentPath = committer.newTaskTempFile(
taskAttemptContext,
partValue,
suffix)
if (maxFileSize.isDefined) "" else suffix
)

currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
Expand Down Expand Up @@ -473,21 +517,8 @@ object LakeSoulFileWriter extends Logging {
customMetrics: Map[String, SQLMetric] = Map.empty)
extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) {

private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
.map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/"))

/** Given an input row, returns the corresponding `bucketId` */
protected lazy val getSrcPath: InternalRow => String = {
row => row.get(0, StringType).asInstanceOf[String]
}

override def write(record: InternalRow): Unit = {
val dstPath = committer.newTaskTempFile(
taskAttemptContext,
partValue,
getSrcPath(record))

statsTrackers.foreach(_.newFile(dstPath))
logInfo("copy file")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not}
import org.apache.spark.sql.lakesoul._
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT}
import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -412,15 +413,34 @@ case class OnePartitionMergeBucketScan(sparkSession: SparkSession,
val fileWithBucketId = groupByPartition.head._2
.groupBy(_.fileBucketId).map(f => (f._1, f._2.toArray))

val fileNumLimit = options.getOrDefault(SCAN_FILE_NUMBER_LIMIT.key, Int.MaxValue.toString).toInt
val isCompactionTask = options.getOrDefault(COMPACTION_TASK.key, COMPACTION_TASK.defaultValueString).toBoolean

Seq.tabulate(bucketNum) { bucketId =>
var files = fileWithBucketId.getOrElse(bucketId, Array.empty)
val isSingleFile = files.length == 1
var groupedFiles = if (fileNumLimit < Int.MaxValue && isCompactionTask) {
val groupedFiles = new ArrayBuffer[Array[MergePartitionedFile]]
for (i <- files.indices by fileNumLimit) {
groupedFiles += files.slice(i, i + fileNumLimit)
}
groupedFiles.toArray
} else {
Array(files)
}

if (!isSingleFile) {
val versionFiles = for (version <- files.indices) yield files(version).copy(writeVersion = version + 1)
files = versionFiles.toArray
var allPartitionIsSingleFile = true
var isSingleFile = false

for (index <- groupedFiles.indices) {
isSingleFile = groupedFiles(index).length == 1
if (!isSingleFile) {
val versionFiles = for (elem <- groupedFiles(index).indices) yield groupedFiles(index)(elem).copy(writeVersion = elem)
groupedFiles(index) = versionFiles.toArray
allPartitionIsSingleFile = false
}
}
MergeFilePartition(bucketId, Array(files), isSingleFile)

MergeFilePartition(bucketId, groupedFiles, allPartitionIsSingleFile)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class NativeParquetFileFormat extends FileFormat

if (options.getOrElse("isCompaction", "false").toBoolean &&
!options.getOrElse("isCDC", "false").toBoolean &&
!options.getOrElse("isBucketNumChanged", "false").toBoolean
!options.getOrElse("isBucketNumChanged", "false").toBoolean &&
options.contains("staticBucketId")
) {
new OutputWriterFactory {
override def newInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.apache.spark.sql.execution.datasources.v2.parquet

import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter.FlushResult
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.types.pojo.Schema
Expand All @@ -18,12 +19,18 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils}

import java.util
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable

class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter {

val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE: Int = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)

private var recordCount = 0

var flushResult: mutable.Map[String, util.List[FlushResult]] = mutable.Map.empty

val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId)

protected val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema)
Expand All @@ -32,7 +39,11 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo

GlutenUtils.setArrowAllocator(nativeIOWriter)
nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)
nativeIOWriter.addFile(path)
if (path.endsWith(".parquet")) {
nativeIOWriter.addFile(path)
} else {
nativeIOWriter.withPrefix(path)
}

NativeIOUtils.setNativeIOOptions(nativeIOWriter, NativeIOUtils.getNativeIOOptions(context, new Path(path)))

Expand Down Expand Up @@ -61,7 +72,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo
recordWriter.finish()

nativeIOWriter.write(root)
nativeIOWriter.flush()
flushResult = nativeIOWriter.flush().asScala

recordWriter.reset()
root.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,39 @@ class DelayedCommitProtocol(jobId: String,
}

override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFileName(taskContext, ext)
val partitionValues = dir.map(parsePartitions).getOrElse(List.empty[(String, String)])
val unescapedDir = if (partitionValues.nonEmpty) {
Some(partitionValues.map(partitionValue => partitionValue._1 + "=" + partitionValue._2).mkString("/"))
} else {
dir
}
val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
// or else write into the partition unescaped directory if it is partitioned
if (ext.isEmpty) {
unescapedDir
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path

val absolutePath = new Path(path, relativePath).toUri.toString
//returns the absolute path to the file
addedFiles.append((partitionValues, absolutePath))
absolutePath
.map(new Path(path, _))
.getOrElse(new Path(path))
.toUri.toString
} else {
val filename = getFileName(taskContext, ext)

val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
// or else write into the partition unescaped directory if it is partitioned
unescapedDir
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path


val absolutePath = new Path(path, relativePath).toUri.toString
//returns the absolute path to the file
addedFiles.append((partitionValues, absolutePath))
absolutePath
}
}

def addOutputFile(partitionValues: List[(String, String)], files: List[String]): Unit = {
files.foreach(file => addedFiles.append((partitionValues, file)))
}

override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,16 @@ import scala.util.Random
/**
* Writes out the files to `path` and returns a list of them in `addedStatuses`.
*/
class DelayedCopyCommitProtocol(jobId: String,
class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo],
jobId: String,
dstPath: String,
randomPrefixLength: Option[Int])
extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength)
with Serializable with Logging {

@transient private var copyFiles: ArrayBuffer[(String, String)] = _

override def setupJob(jobContext: JobContext): Unit = {

}

override def abortJob(jobContext: JobContext): Unit = {
// TODO: Best effort cleanup
}

override def setupTask(taskContext: TaskAttemptContext): Unit = {
copyFiles = new ArrayBuffer[(String, String)]
}


override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val (srcCompactDir, srcBasePath) = splitCompactFilePath(ext)
copyFiles += dir.getOrElse("-5") -> ext
new Path(dstPath, srcBasePath).toString
throw new UnsupportedOperationException(
s"$this does not support adding files with an absolute path")
}

override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
Expand All @@ -57,15 +42,14 @@ class DelayedCopyCommitProtocol(jobId: String,

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {

if (copyFiles.nonEmpty) {
val fs = new Path(copyFiles.head._2).getFileSystem(taskContext.getConfiguration)
val statuses = copyFiles.map { f =>
val (partitionDesc, srcPath) = f
val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcPath)
if (srcFiles.nonEmpty) {
val fs = new Path(srcFiles.head.path).getFileSystem(taskContext.getConfiguration)
val statuses = srcFiles.map { srcFile =>
val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path)
val dstFile = new Path(dstPath, srcBasePath)
FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration)
FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration)
val status = fs.getFileStatus(dstFile)
DataFileInfo(partitionDesc, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime)
DataFileInfo(srcFile.range_partitions, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime)
}

new TaskCommitMessage(statuses)
Expand Down
Loading

0 comments on commit 4d968b8

Please sign in to comment.