Skip to content

Commit

Permalink
feat: addressed comments and added missing code
Browse files Browse the repository at this point in the history
Signed-off-by: MBWhite <[email protected]>
  • Loading branch information
mbwhite authored and vbarua committed Oct 3, 2024
1 parent b2732fa commit 3f01a2a
Show file tree
Hide file tree
Showing 12 changed files with 907 additions and 37 deletions.
1 change: 1 addition & 0 deletions examples/substrait-spark/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ spark-warehouse
derby.log
_apps
_data
bin
23 changes: 18 additions & 5 deletions examples/substrait-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,16 @@ The `justfile` has three targets to make it easy to run the examples
- `just dataset` runs the Dataset API and produces `spark_dataset_substrait.plan`
- `just sql` runs the SQL api and produces `spark_sql_substrait.plan`
- `just consume <planfile>` runs the specified plan (from the `_data` directory)


- run `just` without arguments to get a summary of the targets available.
```
just
Available recipes:
buildapp # Builds the application into a JAR file
consume arg # Consumes the Substrait plan file passed as the argument
dataset # Runs a Spark dataset api query and produces a Substrait plan
spark # Starts a simple Spark cluster locally in docker
sql # Runs a Spark SQL api query and produces a Substrait plan
```


## Creating a Substrait Plan
Expand Down Expand Up @@ -266,7 +274,7 @@ The `io.substrait.plan.Plan` object is a high-level Substrait POJO representing

For the dataset approach, the `spark_dataset_substrait.plan` is created, and for the SQL approach the `spark_sql_substrait.plan` is created. These Intermediate Representations of the query can be saved, transferred and reloaded into a Data Engine.

We can also review the Substrait plan's structure; the canonical format of the Substrait plan is the binary protobuf format, but it's possible to produce a textual version, an example is below. Both the Substrait plans from the Dataset or SQL APIs generate the same output.
We can also review the Substrait plan's structure; the canonical format of the Substrait plan is the binary protobuf format, but it's possible to produce a textual version, an example is below (please see the [SubstraitStringify utility class](./src/main/java/io/substrait/examples/util/SubstraitStringify.java); it's also a good example of how to use some if the vistor patterns). Both the Substrait plans from the Dataset or SQL APIs generate the same output.

```
<Substrait Plan>
Expand Down Expand Up @@ -318,7 +326,9 @@ Loading the binary protobuf file is the reverse of the writing process (in the c
ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
Plan plan = protoToPlan.from(proto);
```
The loaded byte array is first converted into the protobuf Plan, and then into the Substrait Plan object. Note it can be useful to name the variables, and/or use the pull class names to keep track of it's the ProtoBuf Plan or the high-level POJO Plan.

The loaded byte array is first converted into the protobuf Plan, and then into the Substrait Plan object. Note it can be useful to name the variables, and/or use the full class names to keep track of it's the ProtoBuf Plan or the high-level POJO Plan. For example `io.substrait.proto.Plan` or `io.substrait.Plan`


Finally this can be converted to a Spark Plan:

Expand Down Expand Up @@ -395,6 +405,9 @@ When converted to Substrait the Sort and Aggregate is in the same order, but the
+- Aggregate:: FieldRef#/Str/StructField{offset=0}
```

These look different due to two factors. Firstly the Spark optimizer has swapped the project and aggregate functions.
Secondly projects within the Substrait plan joined the fields together but don't reduce the number of fields. Any such filtering is done on the outer relations.

### Inner Join

Spark's inner join is taking as inputs the two filtered relations; it's ensuring the join key is not null but also the `test_result==p` check.
Expand All @@ -408,7 +421,7 @@ Spark's inner join is taking as inputs the two filtered relations; it's ensuring
+- Filter ((isnotnull(test_result#14) AND (test_result#14 = P)) AND isnotnull(vehicle_id#10))
```

The Substrait Representation looks longer, but is showing the same structure. (note that this format is a custom format implemented as [SubstraitStingify](...) as the standard text output can be hard to read).
The Substrait Representation looks longer, but is showing the same structure. (note that this format is a custom format implemented as [SubstraitStingify](./src/main/java/io/substrait/examples/util/SubstraitStringify.java) as the standard text output can be hard to read).

```
+- Join:: INNER <ScalarFn>equal:any_any
Expand Down
6 changes: 5 additions & 1 deletion examples/substrait-spark/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SPARK_MASTER_CONTAINER := "substrait-spark-spark-1"
_default:
@just -f {{justfile()}} --list

# Builds the application into a JAR file
buildapp:
#!/bin/bash
set -e -o pipefail
Expand All @@ -28,25 +29,28 @@ buildapp:
cp ${CWDIR}/build/libs/substrait-spark*.jar ${CWDIR}/_apps/app.jar
cp ${CWDIR}/src/main/resources/*.csv ${CWDIR}/_data

# Runs a Spark dataset api query and produces a Substrait plan
dataset:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkDataset"

# Runs a Spark SQL api query and produces a Substrait plan
sql:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkSQL"

# Consumes the Substrait plan file passed as the argument
consume arg:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkConsumeSubstrait {{arg}}"


# Starts a simple Spark cluster locally in docker
spark:
#!/bin/bash
set -e -o pipefail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.substrait.examples.SparkHelper.ROOT_DIR;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.Plan;
import io.substrait.plan.ProtoPlanConverter;
import io.substrait.spark.logical.ToLogicalPlan;
Expand All @@ -28,6 +29,8 @@ public void run(String arg) {
ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
Plan plan = protoToPlan.from(proto);

SubstraitStringify.explain(plan).forEach(System.out::println);

ToLogicalPlan substraitConverter = new ToLogicalPlan(spark);
LogicalPlan sparkPlan = substraitConverter.convert(plan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.substrait.examples.SparkHelper.TESTS_CSV;
import static io.substrait.examples.SparkHelper.VEHICLES_CSV;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.PlanProtoConverter;
import io.substrait.spark.logical.ToSubstraitRel;
import java.io.IOException;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void createSubstrait(LogicalPlan enginePlan) {
ToSubstraitRel toSubstrait = new ToSubstraitRel();
io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);

System.out.println(plan);
SubstraitStringify.explain(plan).forEach(System.out::println);

PlanProtoConverter planToProto = new PlanProtoConverter();
byte[] buffer = planToProto.toProto(plan).toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,12 @@ public final class SparkHelper {

private SparkHelper() {}

/** Namespace to use for the data */
public static final String NAMESPACE = "demo_db";

/** Vehicles table */
public static final String VEHICLE_TABLE = "vehicles";

/** Tests table (the vehicle safety tests) */
public static final String TESTS_TABLE = "tests";

/** Source data - parquet */
public static final String VEHICLES_PQ = "vehicles_subset_2023.parquet";

/** Source data - parquet */
public static final String TESTS_PQ = "tests_subset_2023.parquet";

/** Source data - csv */
public static final String VEHICLES_CSV = "vehicles_subset_2023.csv";

Expand All @@ -31,26 +22,6 @@ private SparkHelper() {}
/** In-container data location */
public static final String ROOT_DIR = "/opt/spark-data";

/**
* Connect to local spark for demo purposes
*
* @param sparkMaster address of the Spark Master to connect to
* @return SparkSession
*/
public static SparkSession connectSpark(String sparkMaster) {

SparkSession spark =
SparkSession.builder()
// .config("spark.sql.warehouse.dir", "spark-warehouse")
.config("spark.master", sparkMaster)
.enableHiveSupport()
.getOrCreate();

spark.sparkContext().setLogLevel("ERROR");

return spark;
}

/**
* Connects to the local spark cluister
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static io.substrait.examples.SparkHelper.VEHICLES_CSV;
import static io.substrait.examples.SparkHelper.VEHICLE_TABLE;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.PlanProtoConverter;
import io.substrait.spark.logical.ToSubstraitRel;
import java.io.IOException;
Expand Down Expand Up @@ -76,7 +77,8 @@ public void run(String arg) {
public void createSubstrait(LogicalPlan enginePlan) {
ToSubstraitRel toSubstrait = new ToSubstraitRel();
io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);
System.out.println(plan);

SubstraitStringify.explain(plan).forEach(System.out::println);

PlanProtoConverter planToProto = new PlanProtoConverter();
byte[] buffer = planToProto.toProto(plan).toByteArray();
Expand Down
Loading

0 comments on commit 3f01a2a

Please sign in to comment.