Skip to content

Commit

Permalink
docs(spark): add substrait-spark usage examples (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbwhite authored Oct 4, 2024
1 parent b55d8b0 commit 91ce863
Show file tree
Hide file tree
Showing 22 changed files with 1,983 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trim_trailing_whitespace = true
[*.{yaml,yml}]
indent_size = 2

[{**/*.sql,**/OuterReferenceResolver.md,gradlew.bat}]
[{**/*.sql,**/OuterReferenceResolver.md,**gradlew.bat}]
charset = unset
end_of_line = unset
insert_final_newline = unset
Expand Down
20 changes: 20 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ jobs:
uses: gradle/actions/setup-gradle@v3
- name: Build with Gradle
run: gradle build --rerun-tasks
examples:
name: Build Examples
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
- uses: extractions/setup-just@v2
- name: substrait-spark
shell: bash
run: |
pwd
ls -lart
just -f ./examples/substrait-spark/justfile buildapp
isthmus-native-image-mac-linux:
name: Build Isthmus Native Image
needs: java
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ gen
out/**
*.iws
.vscode
.pmdCache

*/bin
5 changes: 5 additions & 0 deletions examples/substrait-spark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
spark-warehouse
derby.log
_apps
_data
bin
569 changes: 569 additions & 0 deletions examples/substrait-spark/README.md

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions examples/substrait-spark/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id("java")
id("com.diffplug.spotless") version "6.19.0"
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}

dependencies {
implementation("org.apache.spark:spark-core_2.12:3.5.1")
implementation("io.substrait:spark:0.36.0")
implementation("io.substrait:core:0.36.0")
implementation("org.apache.spark:spark-sql_2.12:3.5.1")

// For a real Spark application, these would not be required since they would be in the Spark
// server classpath
runtimeOnly("org.apache.spark:spark-core_2.12:3.5.1")
runtimeOnly("org.apache.spark:spark-hive_2.12:3.5.1")
}

tasks.jar {
isZip64 = true
exclude("META-INF/*.RSA")
exclude("META-INF/*.SF")
exclude("META-INF/*.DSA")

duplicatesStrategy = DuplicatesStrategy.EXCLUDE
manifest.attributes["Main-Class"] = "io.substrait.examples.App"
from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
}

tasks.named<Test>("test") {
// Use JUnit Platform for unit tests.
useJUnitPlatform()
}

java { toolchain { languageVersion.set(JavaLanguageVersion.of(17)) } }
32 changes: 32 additions & 0 deletions examples/substrait-spark/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
services:
spark:
image: docker.io/bitnami/spark:3.5
user: ":${MY_GID}"
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
ports:
- '8080:8080'
volumes:
- ./_apps:/opt/spark-apps
- ./_data:/opt/spark-data
spark-worker:
image: docker.io/bitnami/spark:3.5
user: ":${MY_GID}"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
volumes:
- ./_apps:/opt/spark-apps
- ./_data:/opt/spark-data
60 changes: 60 additions & 0 deletions examples/substrait-spark/justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Main justfile to run all the development scripts
# To install 'just' see https://github.com/casey/just#installation

# Ensure all properties are exported as shell env-vars
set export
set dotenv-load

# set the current directory, and the location of the test dats
CWDIR := justfile_directory()

SPARK_VERSION := "3.5.1"

SPARK_MASTER_CONTAINER := "substrait-spark-spark-1"

_default:
@just -f {{justfile()}} --list

# Builds the application into a JAR file
buildapp:
#!/bin/bash
set -e -o pipefail

${CWDIR}/../../gradlew build

# need to let the SPARK user be able to write to the _data mount
mkdir -p ${CWDIR}/_data && chmod g+w ${CWDIR}/_data
mkdir -p ${CWDIR}/_apps

cp ${CWDIR}/build/libs/substrait-spark*.jar ${CWDIR}/_apps/app.jar
cp ${CWDIR}/src/main/resources/*.csv ${CWDIR}/_data

# Runs a Spark dataset api query and produces a Substrait plan
dataset:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkDataset"

# Runs a Spark SQL api query and produces a Substrait plan
sql:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkSQL"

# Consumes the Substrait plan file passed as the argument
consume arg:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkConsumeSubstrait {{arg}}"

# Starts a simple Spark cluster locally in docker
spark:
#!/bin/bash
set -e -o pipefail

export MY_UID=$(id -u)
export MY_GID=$(id -g)
docker compose up
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.substrait.examples;

/** Main class */
public final class App {

/** Implemented by all examples */
public interface Action {

/**
* Run
*
* @param arg argument
*/
void run(String arg);
}

private App() {}

/**
* Traditional main method
*
* @param args string[]
*/
public static void main(String args[]) {
try {

if (args.length == 0) {
args = new String[] {"SparkDataset"};
}
String exampleClass = args[0];

var clz = Class.forName(App.class.getPackageName() + "." + exampleClass);
var action = (Action) clz.getDeclaredConstructor().newInstance();

if (args.length == 2) {
action.run(args[1]);
} else {
action.run(null);
}

} catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.substrait.examples;

import static io.substrait.examples.SparkHelper.ROOT_DIR;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.Plan;
import io.substrait.plan.ProtoPlanConverter;
import io.substrait.spark.logical.ToLogicalPlan;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

/** Minimal Spark application */
public class SparkConsumeSubstrait implements App.Action {

@Override
public void run(String arg) {

// Connect to a local in-process Spark instance
try (SparkSession spark = SparkHelper.connectLocalSpark()) {

System.out.println("Reading from " + arg);
byte[] buffer = Files.readAllBytes(Paths.get(ROOT_DIR, arg));

io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer);
ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
Plan plan = protoToPlan.from(proto);

SubstraitStringify.explain(plan).forEach(System.out::println);

ToLogicalPlan substraitConverter = new ToLogicalPlan(spark);
LogicalPlan sparkPlan = substraitConverter.convert(plan);

System.out.println(sparkPlan);

Dataset.ofRows(spark, sparkPlan).show();

spark.stop();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.substrait.examples;

import static io.substrait.examples.SparkHelper.ROOT_DIR;
import static io.substrait.examples.SparkHelper.TESTS_CSV;
import static io.substrait.examples.SparkHelper.VEHICLES_CSV;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.PlanProtoConverter;
import io.substrait.spark.logical.ToSubstraitRel;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

/** Minimal Spark application */
public class SparkDataset implements App.Action {

@Override
public void run(String arg) {

// Connect to a local in-process Spark instance
try (SparkSession spark = SparkHelper.connectLocalSpark()) {

Dataset<Row> dsVehicles;
Dataset<Row> dsTests;

// load from CSV files
String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString();
String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString();

System.out.println("Reading " + vehiclesFile);
System.out.println("Reading " + testsFile);

dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile);
dsVehicles.show();

dsTests = spark.read().option("delimiter", ",").option("header", "true").csv(testsFile);
dsTests.show();

// created the joined dataset
Dataset<Row> joinedDs =
dsVehicles
.join(dsTests, dsVehicles.col("vehicle_id").equalTo(dsTests.col("vehicle_id")))
.filter(dsTests.col("test_result").equalTo("P"))
.groupBy(dsVehicles.col("colour"))
.count();

joinedDs = joinedDs.orderBy(joinedDs.col("count"));
joinedDs.show();

LogicalPlan plan = joinedDs.queryExecution().optimizedPlan();

System.out.println(plan);
createSubstrait(plan);

spark.stop();
} catch (Exception e) {
e.printStackTrace(System.out);
}
}

/**
* Create substrait plan and save to file based on logical plan
*
* @param enginePlan logical plan
*/
public void createSubstrait(LogicalPlan enginePlan) {
ToSubstraitRel toSubstrait = new ToSubstraitRel();
io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);

SubstraitStringify.explain(plan).forEach(System.out::println);

PlanProtoConverter planToProto = new PlanProtoConverter();
byte[] buffer = planToProto.toProto(plan).toByteArray();
try {
Files.write(Paths.get(ROOT_DIR, "spark_dataset_substrait.plan"), buffer);
System.out.println("File written to " + Paths.get(ROOT_DIR, "spark_sql_substrait.plan"));
} catch (IOException e) {
e.printStackTrace(System.out);
}
}
}
Loading

0 comments on commit 91ce863

Please sign in to comment.