Skip to content

Commit

Permalink
[GLUTEN-8497][VL] A bad test case that fails columnar table cache que…
Browse files Browse the repository at this point in the history
…ry (#8498)
  • Loading branch information
zhztheplayer authored Jan 13, 2025
1 parent 318bb21 commit ecda35b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, StructType}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConverters._

class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"
Expand Down Expand Up @@ -55,7 +58,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
)
}

test("input columnar batch") {
test("Input columnar batch") {
TPCHTables.map(_.name).foreach {
table =>
runQueryAndCompare(s"SELECT * FROM $table", cache = true) {
Expand All @@ -64,7 +67,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

test("input columnar batch and column pruning") {
test("Input columnar batch and column pruning") {
val expected = sql("SELECT l_partkey FROM lineitem").collect()
val cached = sql("SELECT * FROM lineitem").cache()
try {
Expand All @@ -85,7 +88,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

test("input vanilla Spark columnar batch") {
test("Input vanilla Spark columnar batch") {
withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
val df = spark.table("lineitem")
val expected = df.collect()
Expand All @@ -98,6 +101,40 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

// TODO: Fix this case. See https://github.com/apache/incubator-gluten/issues/8497.
testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar scan", Some("3.3")) {
def withId(id: Int): Metadata =
new MetadataBuilder().putLong("parquet.field.id", id).build()

withTempDir {
dir =>
val readSchema =
new StructType()
.add("l_orderkey_read", LongType, true, withId(1))
val writeSchema =
new StructType()
.add("l_orderkey_write", LongType, true, withId(1))
withSQLConf("spark.sql.parquet.fieldId.read.enabled" -> "true") {
// Write a table with metadata information that Gluten Velox backend doesn't support,
// to emulate the scenario that a Spark columnar scan is not offload-able so fallen back,
// then user tries to cache it.
spark
.createDataFrame(
spark.sql("select l_orderkey from lineitem").collect().toList.asJava,
writeSchema)
.write
.mode("overwrite")
.parquet(dir.getCanonicalPath)
val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
df.cache()
// FIXME: The following call will throw since ColumnarCachedBatchSerializer will be
// confused by the input vanilla Parquet scan when its #convertColumnarBatchToCachedBatch
// method is called.
assertThrows[Exception](df.collect())
}
}
}

test("CachedColumnarBatch serialize and deserialize") {
val df = spark.table("lineitem")
val expected = df.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object TransitionGraph {
}
}

// TODO: Consolidate transition graph's cost model with RAS cost model.
private object TransitionCostModel extends FloydWarshallGraph.CostModel[Transition] {
override def zero(): TransitionCost = TransitionCost(0, Nil)
override def costOf(transition: Transition): TransitionCost = {
Expand Down

0 comments on commit ecda35b

Please sign in to comment.