From 92d3e7644ddcfdf978a56a55006295140652949c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 11:56:28 +0800 Subject: [PATCH 1/7] fixup --- .../gluten/extension/columnar/transition/TransitionGraph.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala index 2733ed9f4ff6..ef08a34d5615 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala @@ -77,6 +77,7 @@ object TransitionGraph { } } + // TODO: Consolidate transition graph's cost model with RAS cost model. private object TransitionCostModel extends FloydWarshallGraph.CostModel[Transition] { override def zero(): TransitionCost = TransitionCost(0, Nil) override def costOf(transition: Transition): TransitionCost = { From 28fc0d9a27f09deb507dfb69083e4580310fac18 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 16:19:09 +0800 Subject: [PATCH 2/7] fixup --- .../execution/VeloxColumnarCacheSuite.scala | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index e9151ad84ab6..02684817cbfb 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -24,8 +24,13 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, StructType} import org.apache.spark.storage.StorageLevel +import scala.collection.JavaConverters._ + class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" @@ -55,7 +60,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt ) } - test("input columnar batch") { + test("Input columnar batch") { TPCHTables.map(_.name).foreach { table => runQueryAndCompare(s"SELECT * FROM $table", cache = true) { @@ -64,7 +69,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } - test("input columnar batch and column pruning") { + test("Input columnar batch and column pruning") { val expected = sql("SELECT l_partkey FROM lineitem").collect() val cached = sql("SELECT * FROM lineitem").cache() try { @@ -85,7 +90,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } - test("input vanilla Spark columnar batch") { + test("Input vanilla Spark columnar batch") { withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") { val df = spark.table("lineitem") val expected = df.collect() @@ -98,6 +103,40 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } + // TODO: Fix this case. + ignore("Input fallen back vanilla Spark columnar scan") { + def withId(id: Int): Metadata = + new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() + + withTempDir { + dir => + val readSchema = + new StructType() + .add("l_orderkey_read", LongType, true, withId(1)) + val writeSchema = + new StructType() + .add("l_orderkey_write", LongType, true, withId(1)) + withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + // Write a table with metadata information that Gluten Velox backend doesn't support, + // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back. + spark + .createDataFrame( + spark.sql("select l_orderkey from lineitem").collect().toList.asJava, + writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + df.cache() + df.explain() + // FIXME: The following call will throw since the vanilla Parquet scan could confuse + // ColumnarCachedBatchSerializer by calling its #convertColumnarBatchToCachedBatch + // method. + assert(df.collect().nonEmpty) + } + } + } + test("CachedColumnarBatch serialize and deserialize") { val df = spark.table("lineitem") val expected = df.collect() From 4adcc58f29a8b8c071527f4aab0fc54a5067fc04 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 16:20:27 +0800 Subject: [PATCH 3/7] fixup --- .../org/apache/gluten/execution/VeloxColumnarCacheSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index 02684817cbfb..cf5e4f574ae6 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -118,7 +118,8 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt .add("l_orderkey_write", LongType, true, withId(1)) withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { // Write a table with metadata information that Gluten Velox backend doesn't support, - // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back. + // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back, + // then user commands to cache it. spark .createDataFrame( spark.sql("select l_orderkey from lineitem").collect().toList.asJava, From af48249a4d8388ac2b68ac3d4901b8db37244761 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 16:31:27 +0800 Subject: [PATCH 4/7] fixup --- .../org/apache/gluten/execution/VeloxColumnarCacheSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index cf5e4f574ae6..eeb42e1a2963 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -119,7 +119,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { // Write a table with metadata information that Gluten Velox backend doesn't support, // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back, - // then user commands to cache it. + // then user happens to cache it. spark .createDataFrame( spark.sql("select l_orderkey from lineitem").collect().toList.asJava, From 87348028adc75fdb36a5cd76e4036b5e43c425a5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 16:32:26 +0800 Subject: [PATCH 5/7] fixup --- .../apache/gluten/execution/VeloxColumnarCacheSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index eeb42e1a2963..c414cd959e0c 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -130,9 +130,9 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) df.cache() df.explain() - // FIXME: The following call will throw since the vanilla Parquet scan could confuse - // ColumnarCachedBatchSerializer by calling its #convertColumnarBatchToCachedBatch - // method. + // FIXME: The following call will throw since ColumnarCachedBatchSerializer will be + // confused by the input vanilla Parquet scan when its #convertColumnarBatchToCachedBatch + // method is called. assert(df.collect().nonEmpty) } } From 9ac246b9f9a3ebdac5e59663f68c498901fb969a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 16:34:06 +0800 Subject: [PATCH 6/7] fixup --- .../org/apache/gluten/execution/VeloxColumnarCacheSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index c414cd959e0c..7a4192f3baf4 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -119,7 +119,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { // Write a table with metadata information that Gluten Velox backend doesn't support, // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back, - // then user happens to cache it. + // then user tries to cache it. spark .createDataFrame( spark.sql("select l_orderkey from lineitem").collect().toList.asJava, From d66c3f5a58015e4ff27a41e5395beeb18828d967 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 Jan 2025 16:50:52 +0800 Subject: [PATCH 7/7] fixup --- .../gluten/execution/VeloxColumnarCacheSuite.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index 7a4192f3baf4..8c7be883bbbc 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -24,8 +24,6 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, StructType} import org.apache.spark.storage.StorageLevel @@ -103,10 +101,10 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } - // TODO: Fix this case. - ignore("Input fallen back vanilla Spark columnar scan") { + // TODO: Fix this case. See https://github.com/apache/incubator-gluten/issues/8497. + testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar scan", Some("3.3")) { def withId(id: Int): Metadata = - new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() + new MetadataBuilder().putLong("parquet.field.id", id).build() withTempDir { dir => @@ -116,7 +114,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt val writeSchema = new StructType() .add("l_orderkey_write", LongType, true, withId(1)) - withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withSQLConf("spark.sql.parquet.fieldId.read.enabled" -> "true") { // Write a table with metadata information that Gluten Velox backend doesn't support, // to emulate the scenario that a Spark columnar scan is not offload-able so fallen back, // then user tries to cache it. @@ -129,11 +127,10 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt .parquet(dir.getCanonicalPath) val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) df.cache() - df.explain() // FIXME: The following call will throw since ColumnarCachedBatchSerializer will be // confused by the input vanilla Parquet scan when its #convertColumnarBatchToCachedBatch // method is called. - assert(df.collect().nonEmpty) + assertThrows[Exception](df.collect()) } } }