Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

whole stage generated code for a simple query #23

Open
bithw1 opened this issue Jan 5, 2020 · 6 comments
Open

whole stage generated code for a simple query #23

bithw1 opened this issue Jan 5, 2020 · 6 comments

Comments

@bithw1
Copy link

bithw1 commented Jan 5, 2020

Hi, @bartosz25 ,

I have a simple test case that would like to see whole stage generated code

  test("whole stage code gen test") {
    val spark = SparkSession.builder().enableHiveSupport().master("local").appName("whole stage code gen test").getOrCreate()
    import spark.implicits._
    Seq(("A", 1), ("B", 2), ("C", 3)).toDF("name", "age").createOrReplaceTempView("t")
    val df = spark.sql(
      """
        select name, age from t  where age > 2
      """.stripMargin(' '))
    df.explain(true)
    df.show()
    spark.stop()
  }

following is snippet of generated code:

final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;  
private scala.collection.Iterator localtablescan_input_0;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];

public GeneratedIteratorForCodegenStage1(Object[] references) {
this.references = references;
}
public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs; 
localtablescan_input_0 = inputs[0]; //LocalTableScanExec
filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);
filter_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);

}

I don't understand varaible filter_mutableStateArray_0, it is created for the FilterExec(the varable name starts with filter), I think this variable should be created for ProjectExec, that should be named project_mutableStateArray_0(it is of type UnsafeRowWriter),

I am not sure why this variable is created for FilterExec,
Could you please have a look? Thanks!

@bartosz25
Copy link
Owner

Hi @bithw1

I will take a look at your example today or tomorrow. Thank you.

@bartosz25
Copy link
Owner

Re @bithw1

I didn't succeed to reproduce the same generated code. Do you have any specific setup?
The code I get every time looks like:

/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.Unsaf
eRowWriter[1];
/* 009 */
/* 010 */   public SpecificUnsafeProjection(Object[] references) {
/* 011 */     this.references = references;
/* 012 */     mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);
/* 013 */
/* 014 */   }
/* 015 */
/* 016 */   public void initialize(int partitionIndex) {
/* 017 */
/* 018 */   }

@bithw1
Copy link
Author

bithw1 commented Jan 7, 2020

Thanks @bartosz25 .

I print out the message from WholeStageCodegenExec#doExecute,

  override def doExecute(): RDD[InternalRow] = {
    val (ctx, cleanedSource) = doCodeGen()
    println("cleanedSource#" + cleanedSource.body)  //and println to print the genereted code

I am using the code of the Spark's master branch, so that, I am running against the latest spark code base.I am not sure whether older'version will print out the similar generated code.

@bartosz25
Copy link
Owner

OK, that explains why we got different results :) I launched the code against Spark 3 preview 2 and also got a single scan stage:

[2020-01-07 06:20:10,399] org.apache.spark.internal.Logging DEBUG 
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private boolean resultIsNull_0;
/* 009 */   private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.Unsaf
eRowWriter[1];
/* 011 */
/* 012 */   public SpecificUnsafeProjection(Object[] references) {
/* 013 */     this.references = references;
/* 014 */
/* 015 */     mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);
/* 016 */
/* 017 */   }
/* 018 */

And the plans + result:

== Parsed Logical Plan ==
'Project ['name, 'age]
+- 'Filter ('age > 2)
   +- 'UnresolvedRelation [t]

== Analyzed Logical Plan ==
name: string, age: int
Project [name#7, age#8]
+- Filter (age#8 > 2)
   +- SubqueryAlias `t`
      +- Project [_1#2 AS name#7, _2#3 AS age#8]
         +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
LocalRelation [name#7, age#8]

== Physical Plan ==
LocalTableScan [name#7, age#8]

+----+---+
|name|age|
+----+---+
|   C|  3|
+----+---+

Maybe check the preview branch because master seems to be still work in progress (26 127 commits vs 26 004 for the preview), or stay for now with 2.4.4 :) To debug the code generated stage, you can also use this tip instead of adding printing in the framework: https://www.waitingforcode.com/tips/spark-sql/how_show_generated_code :)

@bithw1
Copy link
Author

bithw1 commented Jan 7, 2020

thanks @bartosz25 .
Yes, The debugCodegen feature prints exactly the same thing as I did by inserting a println statement in the code, as debugCodegen will call WholeStageCodegenExec#doCodegen method.

hmm...I am kind of surprised that we saw the different output,

https://github.com/apache/spark/blob/v2.4.0/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

above is 2.4.0, we should see line 549, which is:

/* 011 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {

@bartosz25
Copy link
Owner

bartosz25 commented Jan 7, 2020

Let's confirm first if the output is really different for you if you launch the code against 2.4.0.

above is 2.4.0, we should see line 549, which is:

/* 011 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {

Maybe there is no change in WholeStageCodegenExec but somewhere earlier in the planning? I didn't follow what happen on master ever day, I do only when I write new posts ;-) IMO if you try to understand what happens, it's better to test on a stable version, eventually beta if you're really curious :P

Could you try then to run the code on top of 2.4.0 to see if you also get different plan than I do?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants