diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index e9151ad84ab6..8c7be883bbbc 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -24,8 +24,11 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, StructType} import org.apache.spark.storage.StorageLevel +import scala.collection.JavaConverters._ + class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" @@ -55,7 +58,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt ) } - test("input columnar batch") { + test("Input columnar batch") { TPCHTables.map(_.name).foreach { table => runQueryAndCompare(s"SELECT * FROM $table", cache = true) { @@ -64,7 +67,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } - test("input columnar batch and column pruning") { + test("Input columnar batch and column pruning") { val expected = sql("SELECT l_partkey FROM lineitem").collect() val cached = sql("SELECT * FROM lineitem").cache() try { @@ -85,7 +88,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } - test("input vanilla Spark columnar batch") { + test("Input vanilla Spark columnar batch") { withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") { val df = spark.table("lineitem") val expected = df.collect() @@ -98,6 +101,40 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } + // TODO: Fix this case. See https://github.com/apache/incubator-gluten/issues/8497. + testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar scan", Some("3.3")) { + def withId(id: Int): Metadata = + new MetadataBuilder().putLong("parquet.field.id", id).build() + + withTempDir { + dir => + val readSchema = + new StructType() + .add("l_orderkey_read", LongType, true, withId(1)) + val writeSchema = + new StructType() + .add("l_orderkey_write", LongType, true, withId(1)) + withSQLConf("spark.sql.parquet.fieldId.read.enabled" -> "true") { + // Write a table with metadata information that Gluten Velox backend doesn't support, + // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back, + // then user tries to cache it. + spark + .createDataFrame( + spark.sql("select l_orderkey from lineitem").collect().toList.asJava, + writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + df.cache() + // FIXME: The following call will throw since ColumnarCachedBatchSerializer will be + // confused by the input vanilla Parquet scan when its #convertColumnarBatchToCachedBatch + // method is called. + assertThrows[Exception](df.collect()) + } + } + } + test("CachedColumnarBatch serialize and deserialize") { val df = spark.table("lineitem") val expected = df.collect() diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala index 2733ed9f4ff6..ef08a34d5615 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala @@ -77,6 +77,7 @@ object TransitionGraph { } } + // TODO: Consolidate transition graph's cost model with RAS cost model. private object TransitionCostModel extends FloydWarshallGraph.CostModel[Transition] { override def zero(): TransitionCost = TransitionCost(0, Nil) override def costOf(transition: Transition): TransitionCost = {