From 91ce8636e0fae0ff7e7398af4c56294f1f4f0762 Mon Sep 17 00:00:00 2001 From: Matthew B White Date: Fri, 4 Oct 2024 15:29:46 +0100 Subject: [PATCH] docs(spark): add substrait-spark usage examples (#293) --- .editorconfig | 2 +- .github/workflows/pr.yml | 20 + .gitignore | 3 + examples/substrait-spark/.gitignore | 5 + examples/substrait-spark/README.md | 569 ++++++++++++++++++ examples/substrait-spark/build.gradle.kts | 40 ++ examples/substrait-spark/docker-compose.yaml | 32 + examples/substrait-spark/justfile | 60 ++ .../main/java/io/substrait/examples/App.java | 45 ++ .../examples/SparkConsumeSubstrait.java | 46 ++ .../io/substrait/examples/SparkDataset.java | 85 +++ .../io/substrait/examples/SparkHelper.java | 38 ++ .../java/io/substrait/examples/SparkSQL.java | 93 +++ .../examples/util/ExpressionStringify.java | 276 +++++++++ .../examples/util/FunctionArgStringify.java | 31 + .../examples/util/ParentStringify.java | 57 ++ .../examples/util/SubstraitStringify.java | 339 +++++++++++ .../examples/util/TypeStringify.java | 175 ++++++ .../src/main/resources/tests_subset_2023.csv | 30 + .../main/resources/vehicles_subset_2023.csv | 31 + readme.md | 6 + settings.gradle.kts | 2 +- 22 files changed, 1983 insertions(+), 2 deletions(-) create mode 100644 examples/substrait-spark/.gitignore create mode 100644 examples/substrait-spark/README.md create mode 100644 examples/substrait-spark/build.gradle.kts create mode 100644 examples/substrait-spark/docker-compose.yaml create mode 100644 examples/substrait-spark/justfile create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/App.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/SparkDataset.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/SparkHelper.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/SparkSQL.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/util/ExpressionStringify.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/util/FunctionArgStringify.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/util/ParentStringify.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/util/SubstraitStringify.java create mode 100644 examples/substrait-spark/src/main/java/io/substrait/examples/util/TypeStringify.java create mode 100644 examples/substrait-spark/src/main/resources/tests_subset_2023.csv create mode 100644 examples/substrait-spark/src/main/resources/vehicles_subset_2023.csv diff --git a/.editorconfig b/.editorconfig index cc987b518..ce1567087 100644 --- a/.editorconfig +++ b/.editorconfig @@ -10,7 +10,7 @@ trim_trailing_whitespace = true [*.{yaml,yml}] indent_size = 2 -[{**/*.sql,**/OuterReferenceResolver.md,gradlew.bat}] +[{**/*.sql,**/OuterReferenceResolver.md,**gradlew.bat}] charset = unset end_of_line = unset insert_final_newline = unset diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 2feebebea..877510d15 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -86,6 +86,26 @@ jobs: uses: gradle/actions/setup-gradle@v3 - name: Build with Gradle run: gradle build --rerun-tasks + examples: + name: Build Examples + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + - uses: extractions/setup-just@v2 + - name: substrait-spark + shell: bash + run: | + pwd + ls -lart + just -f ./examples/substrait-spark/justfile buildapp + isthmus-native-image-mac-linux: name: Build Isthmus Native Image needs: java diff --git a/.gitignore b/.gitignore index d7d9428ea..50c4f73e5 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ gen out/** *.iws .vscode +.pmdCache + +*/bin diff --git a/examples/substrait-spark/.gitignore b/examples/substrait-spark/.gitignore new file mode 100644 index 000000000..4eb18b0c4 --- /dev/null +++ b/examples/substrait-spark/.gitignore @@ -0,0 +1,5 @@ +spark-warehouse +derby.log +_apps +_data +bin diff --git a/examples/substrait-spark/README.md b/examples/substrait-spark/README.md new file mode 100644 index 000000000..d9885dd83 --- /dev/null +++ b/examples/substrait-spark/README.md @@ -0,0 +1,569 @@ +# Introduction to the Substrait-Spark library + +The Substrait-Spark library allows Substrait plans to convert to and from Spark Plans. This example will show how this can be used. + +## How does this work in practice? + +Once Spark SQL and Spark DataFrame APIs queries have been created, Spark's optimized query plan can be used to generate Substrait plans; and Substrait Plans can be executed on a Spark cluster. Below is a description of how to use this library; there are two sample datasets included for demonstration. + +The most commonly used logical relations are supported, including those generated from all the TPC-H queries, but there are currently some gaps in support that prevent all the TPC-DS queries from being translatable. + + +## Running the examples + +There are 3 example classes: + +- [SparkDataset](./app/src/main/java/io/substrait/examples/SparkDataset.java) that creates a plan starting with the Spark Dataset API +- [SparkSQL](./app/src/main/java/io/substrait/examples/SparkSQL.java) that creates a plan starting with the Spark SQL API +- [SparkConsumeSubstrait](./app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java) that loads a Substrait plan and executes it + + + +### Requirements + +To run these you will need: + +- Java 17 or greater +- Docker to start a test Spark Cluster + - You could use your own cluster, but would need to adjust file locations defined in [SparkHelper](./app/src/main/java/io/substrait/examples/SparkHelper.java) +- The [just task runner](https://github.com/casey/just#installation) is optional, but very helpful to run the bash commands +- [Two datafiles](./app/src/main/resources/) are provided (CSV format) + +For building using the `substrait-spark` library youself, using the [mvn repository](https://mvnrepository.com/artifact/io.substrait/spark) + +Using Maven: +```xml + + + io.substrait + spark + 0.36.0 + +``` + +Using Gradle (Groovy) +```groovy +// https://mvnrepository.com/artifact/io.substrait/spark +implementation 'io.substrait:spark:0.36.0' +``` + +### Setup configuration + +Firstly the application needs to be built; this is a simple Java application. As well issuing the `gradle` build command it also creates two directories `_apps` and `_data`. The JAR file and will be copied to the `_apps` directory and the datafiles to the `_data`. Note that the permissions on the `_data` directory are set to group write - this allows the Spark process in the docker container to write the output plan + +To run using `just` +``` +just buildapp + +# or + +./gradlew build +mkdir -p ./_data && chmod g+w ./_data +mkdir -p ./_apps + +cp ./app/build/libs/app.jar ./_apps +cp ./app/src/main/resources/*.csv ./_data + +``` + +- In the `_data` directory there are now two csv files [tests_subset_2023.csv](./app/src/main/resources/tests_subset_2023.csv) and [vehicles_subset_2023.csv](./app/src/main/resources/vehicles_subset_2023.csv) + + +Second, you can start the basic Spark cluster - this uses `docker compose`. It is best to start this is a separate window + +``` +just spark +``` + +- In [SparkHelper](./app/src/main/java/io/substrait/examples/SparkHelper.java) there are constants defined to match these locations + +```java + public static final String VEHICLES_PQ_CSV = "vehicles_subset_2023.csv"; + public static final String TESTS_PQ_CSV = "tests_subset_2023.csv"; + public static final String ROOT_DIR = "file:/opt/spark-data"; +``` + +- To run the application `exec` into the SparkMaster node, and issue `spark-submit` + +``` +docker exec -it subtrait-spark-spark-1 bash +/opt/spark/bin/spark-submit --master spark://subtrait-spark-spark-1:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar +``` + +The `justfile` has three targets to make it easy to run the examples + +- `just dataset` runs the Dataset API and produces `spark_dataset_substrait.plan` +- `just sql` runs the SQL api and produces `spark_sql_substrait.plan` +- `just consume ` runs the specified plan (from the `_data` directory) +- run `just` without arguments to get a summary of the targets available. +``` +just +Available recipes: + buildapp # Builds the application into a JAR file + consume arg # Consumes the Substrait plan file passed as the argument + dataset # Runs a Spark dataset api query and produces a Substrait plan + spark # Starts a simple Spark cluster locally in docker + sql # Runs a Spark SQL api query and produces a Substrait plan +``` + + +## Creating a Substrait Plan + +In [SparkSQL](./app/src/main/java/io/substrait/examples/SparkSQL.java) is a simple use of SQL to join the two tables; after reading the two CSV files, the SQL query is defined. This is then run on Spark. + +### Loading data + +Firstly the filenames are created, and the CSV files read. Temporary views need to be created to refer to these tables in the SQL query. + +```java + String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString(); + String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString(); + + spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile) + .createOrReplaceTempView(VEHICLE_TABLE); + spark.read().option("delimiter", ",").option("header", "true").csv(testsFile) + .createOrReplaceTempView(TESTS_TABLE); +``` + +### Creating the SQL query + +The following standard SQL query string finds the counts of all cars (grouped by colour) of all vehicles that have passed the vehicle safety test. + +```java + String sqlQuery = """ + SELECT vehicles.colour, count(*) as colourcount + FROM vehicles + INNER JOIN tests ON vehicles.vehicle_id=tests.vehicle_id + WHERE tests.test_result = 'P' + GROUP BY vehicles.colour + ORDER BY count(*) + """; + var result = spark.sql(sqlQuery); + result.show(); +``` + +If we were to just run this as-is, the output table would be below. +``` ++------+-----------+ +|colour|colourcount| ++------+-----------+ +| GREEN| 1| +|BRONZE| 1| +| RED| 2| +| BLACK| 2| +| GREY| 2| +| BLUE| 2| +|SILVER| 3| +| WHITE| 5| ++------+-----------+ +``` + +### Logical and Optimized Query Plans + +The next step is to look at the logical and optimised query plans that Spark has constructed. + +```java + LogicalPlan logical = result.logicalPlan(); + System.out.println(logical); + + LogicalPlan optimised = result.queryExecution().optimizedPlan(); + System.out.println(optimised); +``` + +The logical plan will be: + +``` +Sort [colourcount#30L ASC NULLS FIRST], true ++- Aggregate [colour#3], [colour#3, count(1) AS colourcount#30L] + +- Filter (test_result#19 = P) + +- Join Inner, (vehicle_id#0L = vehicle_id#15L) + :- SubqueryAlias vehicles + : +- View (`vehicles`, [vehicle_id#0L,make#1,model#2,colour#3,fuel_type#4,cylinder_capacity#5L,first_use_date#6]) + : +- Relation [vehicle_id#0L,make#1,model#2,colour#3,fuel_type#4,cylinder_capacity#5L,first_use_date#6] csv + +- SubqueryAlias tests + +- View (`tests`, [test_id#14L,vehicle_id#15L,test_date#16,test_class#17,test_type#18,test_result#19,test_mileage#20L,postcode_area#21]) + +- Relation [test_id#14L,vehicle_id#15L,test_date#16,test_class#17,test_type#18,test_result#19,test_mileage#20L,postcode_area#21] csv +``` + +Similarly, the optimized plan can be found; here the `SubQuery` and `View` have been converted into Project and Filter + +``` +Sort [colourcount#30L ASC NULLS FIRST], true ++- Aggregate [colour#3], [colour#3, count(1) AS colourcount#30L] + +- Project [colour#3] + +- Join Inner, (vehicle_id#0L = vehicle_id#15L) + :- Project [vehicle_id#0L, colour#3] + : +- Filter isnotnull(vehicle_id#0L) + : +- Relation [vehicle_id#0L,make#1,model#2,colour#3,fuel_type#4,cylinder_capacity#5L,first_use_date#6] csv + +- Project [vehicle_id#15L] + +- Filter ((isnotnull(test_result#19) AND (test_result#19 = P)) AND isnotnull(vehicle_id#15L)) + +- Relation [test_id#14L,vehicle_id#15L,test_date#16,test_class#17,test_type#18,test_result#19,test_mileage#20L,postcode_area#21] csv +``` + +### Dataset API + +Alternatively, the Dataset API can be used to create the plans, the code for this in [`SparkDataset`](./app/src/main/java/io/substrait/examples/SparkDataset.java). The overall flow of the code is very similar + +Rather than create a temporary view, the reference to the datasets are kept in `dsVehicles` and `dsTests` +```java + dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile); + dsVehicles.show(); + + dsTests = spark.read().option("delimiter", ",").option("header", "true").csv(testsFile); + dsTests.show(); +``` + +They query can be constructed based on these two datasets + +```java + Dataset joinedDs = dsVehicles.join(dsTests, dsVehicles.col("vehicle_id").equalTo(dsTests.col("vehicle_id"))) + .filter(dsTests.col("test_result").equalTo("P")) + .groupBy(dsVehicles.col("colour")) + .count(); + + joinedDs = joinedDs.orderBy(joinedDs.col("count")); + joinedDs.show(); +``` + +Using the same APIs, the Spark's optimized plan is available. If you compare this to the plan above you will see that structurally it is identical. + +``` +Sort [count#189L ASC NULLS FIRST], true ++- Aggregate [colour#20], [colour#20, count(1) AS count#189L] + +- Project [colour#20] + +- Join Inner, (vehicle_id#17 = vehicle_id#86) + :- Project [vehicle_id#17, colour#20] + : +- Filter isnotnull(vehicle_id#17) + : +- Relation [vehicle_id#17,make#18,model#19,colour#20,fuel_type#21,cylinder_capacity#22,first_use_date#23] csv + +- Project [vehicle_id#86] + +- Filter ((isnotnull(test_result#90) AND (test_result#90 = P)) AND isnotnull(vehicle_id#86)) + +- Relation [test_id#85,vehicle_id#86,test_date#87,test_class#88,test_type#89,test_result#90,test_mileage#91,postcode_area#92] csv +``` + +### Substrait Creation + +This optimized plan is the best starting point to produce a Substrait Plan; there's a `createSubstrait(..)` function that does the work and produces a binary protobuf Substrait file. + +``` + LogicalPlan optimised = result.queryExecution().optimizedPlan(); + System.out.println(optimised); + + createSubstrait(optimised); +``` + +Let's look at the APIs in the `createSubstrait(...)` method to see how it's using the `Substrait-Spark` Library. + +```java + ToSubstraitRel toSubstrait = new ToSubstraitRel(); + io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan); +``` + +`ToSubstraitRel` is the main class and provides the convert method; this takes the Spark plan (optimized plan is best) and produces the Substrait Plan. The most common relations are supported currently - and the optimized plan is more likely to use these. + +The `io.substrait.plan.Plan` object is a high-level Substrait POJO representing a plan. This could be used directly or more likely be persisted. protobuf is the canonical serialization form. It's easy to convert this and store in a file + +```java + PlanProtoConverter planToProto = new PlanProtoConverter(); + byte[] buffer = planToProto.toProto(plan).toByteArray(); + try { + Files.write(Paths.get(ROOT_DIR, "spark_sql_substrait.plan"),buffer); + } catch (IOException e){ + e.printStackTrace(); + } +``` + +For the dataset approach, the `spark_dataset_substrait.plan` is created, and for the SQL approach the `spark_sql_substrait.plan` is created. These Intermediate Representations of the query can be saved, transferred and reloaded into a Data Engine. + +We can also review the Substrait plan's structure; the canonical format of the Substrait plan is the binary protobuf format, but it's possible to produce a textual version, an example is below (please see the [SubstraitStringify utility class](./src/main/java/io/substrait/examples/util/SubstraitStringify.java); it's also a good example of how to use some of the visitor patterns). Both the Substrait plans from the Dataset or SQL APIs generate the same output. + +``` + +Root :: ImmutableSort [colour, count] + ++- Sort:: FieldRef#/I64/StructField{offset=1} ASC_NULLS_FIRST + +- Project:: [Str, I64, Str, I64] + +- Aggregate:: FieldRef#/Str/StructField{offset=0} + +- Project:: [Str, Str, Str, Str] + +- Join:: INNER equal:any_any + : arg0 = FieldRef#/Str/StructField{offset=0} + : arg1 = FieldRef#/Str/StructField{offset=2} + +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str] + +- Filter:: is_not_null:any + : arg0 = FieldRef#/Str/StructField{offset=0} + +- LocalFiles:: + : file:///opt/spark-data/vehicles_subset_2023.csv len=1547 partition=0 start=0 + +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str] + +- Filter:: and:bool + : arg0 = and:bool + : arg0 = is_not_null:any + : arg0 = FieldRef#/Str/StructField{offset=5} + : arg1 = equal:any_any + : arg0 = FieldRef#/Str/StructField{offset=5} + : arg1 = + : arg1 = is_not_null:any + : arg0 = FieldRef#/Str/StructField{offset=1} + +- LocalFiles:: + : file:///opt/spark-data/tests_subset_2023.csv len=1491 partition=0 start=0 +``` + +There is more detail in this version than the Spark version; details of the functions called for example are included. However, the structure of the overall plan is identical with 1 exception. There is an additional `project` relation included between the `sort` and `aggregate` - this is necessary to get the correct types of the output data. + +We can also see in this case as the plan came from Spark directly it's also included the location of the datafiles. Below when we reload this into Spark, the locations of the files don't need to be explicitly included. + + +As the `Substrait Spark` library also allows plans to be loaded and executed, so the next step is to consume these Substrait plans. + +## Consuming a Substrait Plan + +The [`SparkConsumeSubstrait`](./app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java) code shows how to load this file, and most importantly how to convert it to a Spark engine plan to execute. + +Loading the binary protobuf file is the reverse of the writing process (in the code the file name comes from a command line argument, here we're showing the hardcoded file name ) + +```java + byte[] buffer = Files.readAllBytes(Paths.get("spark_sql_substrait.plan")); + io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer); + + ProtoPlanConverter protoToPlan = new ProtoPlanConverter(); + Plan plan = protoToPlan.from(proto); +``` + +The loaded byte array is first converted into the protobuf Plan, and then into the Substrait Plan object. Note it can be useful to name the variables, and/or use the full class names to keep track whether it's the ProtoBuf Plan or the high-level POJO Plan. For example `io.substrait.proto.Plan` or `io.substrait.Plan` + + +Finally this can be converted to a Spark Plan: + +```java + ToLogicalPlan substraitConverter = new ToLogicalPlan(spark); + LogicalPlan sparkPlan = substraitConverter.convert(plan); +``` + +If you were to print out this plan, it has the identical structure to the plan seen earlier on. + +``` ++- Sort [count(1)#18L ASC NULLS FIRST], true + +- Aggregate [colour#5], [colour#5, count(1) AS count(1)#18L] + +- Project [colour#5] + +- Join Inner, (vehicle_id#2 = vehicle_id#10) + :- Project [vehicle_id#2, colour#5] + : +- Filter isnotnull(vehicle_id#2) + : +- Relation [vehicle_id#2,make#3,model#4,colour#5,fuel_type#6,cylinder_capacity#7,first_use_date#8] csv + +- Project [vehicle_id#10] + +- Filter ((isnotnull(test_result#14) AND (test_result#14 = P)) AND isnotnull(vehicle_id#10)) + +- Relation [test_id#9,vehicle_id#10,test_date#11,test_class#12,test_type#13,test_result#14,test_mileage#15,postcode_area#16] csv +``` + +Execution of this plan is then a simple `Dataset.ofRows(spark, sparkPlan).show();` giving the output of + +```java ++------+-----+ +|colour|count| ++------+-----+ +| GREEN| 1| +|BRONZE| 1| +| RED| 2| +| BLACK| 2| +| GREY| 2| +| BLUE| 2| +|SILVER| 3| +| WHITE| 5| ++------+-----+ +``` + +### Observations + +To recap on the steps above + +- Two CSV files have been loaded into Spark +- Using either the Spark SQL or the Spark Dataset API we can produce a query across those two datasets +- Both queries result in Spark creating a logical and optimized query plan + - And both being are structurally identical +- Using the Substrait-Java library, we can convert the optimized plan into the Substrait format. +- This Substrait intermediate representation of the query can be serialized via the protobuf format + - Here store as a flat file containing the bytes of that protobuf +- *Separately* this file can be loaded and the Substrait Plan converted to a Spark Plan +- This can be run in an application on Spark getting the same results + +--- +## Plan Comparison + +The structure of the query plans for both Spark and Substrait are structurally very similar. + + +### Inner Join + +Spark's inner join is taking as inputs the two filtered relations; it's ensuring the join key is not null but also the `test_result==p` check. + +``` + +- Join Inner, (vehicle_id#2 = vehicle_id#10) + :- Project [vehicle_id#2, colour#5] + : +- Filter isnotnull(vehicle_id#2) + + +- Project [vehicle_id#10] + +- Filter ((isnotnull(test_result#14) AND (test_result#14 = P)) AND isnotnull(vehicle_id#10)) +``` + +The Substrait Representation looks longer, but is showing the same structure. (note that this format is a custom format implemented as [SubstraitStingify](./src/main/java/io/substrait/examples/util/SubstraitStringify.java) as the standard text output can be hard to read). + +``` + +- Join:: INNER equal:any_any + : arg0 = FieldRef#/Str/StructField{offset=0} + : arg1 = FieldRef#/Str/StructField{offset=2} + +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str] + +- Filter:: is_not_null:any + : arg0 = FieldRef#/Str/StructField{offset=0} + + +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str] + +- Filter:: and:bool + : arg0 = and:bool + : arg0 = is_not_null:any + : arg0 = FieldRef#/Str/StructField{offset=5} + : arg1 = equal:any_any + : arg0 = FieldRef#/Str/StructField{offset=5} + : arg1 = + : arg1 = is_not_null:any + : arg0 = FieldRef#/Str/StructField{offset=1} +``` + +### LocalFiles + +The source of the data originally was two csv files; in the Spark plan this is referred to by csv suffix: ` Relation [...] csv`; this is represented in the Substrait plan as +``` + +- LocalFiles:: + : file:///opt/spark-data/tests_subset_2023.csv len=1491 partition=0 start=0 +``` + +There is a dedicated Substrait `ReadRel` relation for referencing files, it does include additional information about the type of the file, size, format and options for reading those specific formats. Parquet/Arrow/Orc/ProtoBuf/Dwrf currently all have specific option structures. + +## Data Locations + +The implication of a relation that includes a filename is seen when the plan is deserialized and executed; the binary Substrait plan needs to be read, converted into a Substrait Plan POJO and passed to the Spark-Substrait library to be converted. Once converted it can be directly executed. + +The plan itself contains all the information needed to be able to execute the query. + +A slight difference is observed when the Spark DataFrame is saved as a Hive table. Using `saveAsTable(...)` and `table(...)` the data can be persisted. + +```java + String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString(); + Dataset dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile); + dsVehicles.write().saveAsTable("vehicles"); + + spark.read().table("vehicles").show(); +``` + +When this is table is read and used in queries the Substrait "ReadRel" will be a `NamedScan` instead; this is referring to a table +`[spark_catalog, default, vehicles]` - default is the name of the default Spark database. + +``` + +- NamedScan:: Tables=[spark_catalog, default, vehicles] Fields=vehicle_id[Str],make[Str],model[Str],colour[Str],fuel_type[Str],cylinder_capacity[Str],first_use_date[Str] +``` + +This plan can be consumed in exactly the same many as the other plans; the only difference being, _if the table is not aleady_ present it will fail to execute. There isn't the source of the data, rather a reference name, and the expected fields. Ensuring the data is present in Spark, the query will execute without issue. + +## Observations on LoadFiles/NamedScan + +Including the information on the location of the data permits easy use of the plan. In the example here this worked well; however there could be difficulties depending on the recipient engine. Substrait as an intermediate form gives the ability to transfer the plans between engines; how different engines catalogue their data will be relevant. + +For example the above plan can be handled with PyArrow or DuckDB (as an example there are a variety of other engines); the code for consuming the plans is straightforward. + +```python + with open(PLAN_FILE, "wb") as file: + planbytes = file.read() + reader = substrait.run_query( + base64.b64decode(planbytes), + table_provider=self.simple_provider, + ) + result = reader.read_all() +``` + +When run with the plan pyarrow instantly rejects it with + +``` +pyarrow.lib.ArrowNotImplementedError: non-default substrait::ReadRel::LocalFiles::FileOrFiles::length +``` + +DuckDB has a simiar API `connection.from_substrait(planbyhtes)` and produces a different error + +``` +duckdb.duckdb.IOException: IO Error: No files found that match the pattern "file:///opt/spark-data/tests_subset_2023.csv" +``` + +This shows that different engines will potentially have different supported relations; PyArrow wants to delegate the loading of the data to the user, whereas DuckDB is happy to load files. DuckDB though of course can only proceed with the information that it has, the URI of the file here is coupled to the location of the data on the originating engine. Something like a s3 uri could be potentially useful. + +Creating a plan from Spark but where the data is saved as table provides an alternative. Depending on the engine this can also need some careful handling. In the `NamedScan` above, the name was a list of 3 strings. `Tables=[spark_catalog, default, vehicles]`. Whilst DuckDB's implementation understands that these are referring to a table, its own catalogue can't be indexed with these three values. + +``` +duckdb.duckdb.CatalogException: Catalog Error: Table with name spark_catalog does not exist! +``` + +PyArrow takes a different approach in locating the data. In the PyArrow code above there is a reference to a `table_provider`; the job of 'providing a table' is delegated back to the user. + +Firstly we need to load the datasets to PyArrow datasets +```python + test = pq.read_table(TESTS_PQ_FILE) + vehicles = pq.read_table(VEHICLES_PQ_FILE) +``` + +We can define a `table_provider` function; this logs which table is being requested, but also what the expected schema is. +As names is a array, we can check the final part of the name and return the matching dataset. + +```python + def table_provider(self, names, schema): + print(f"== Requesting table {names} with schema: \n{schema}\n") + + if not names: + raise Exception("No names provided") + else: + if names[-1] == "tests": + return self.test + elif names[-1] == "vehicles": + return self.vehicles + + raise Exception(f"Unrecognized table name {names}") +``` + + +When run the output is along these lines (the query is slightly different here for simplicity); we can see the tables being requested and the schema expected. Nothing is done with the schema here but could be useful for ensuring that the expectations of the plan match the schema of the data held in the engine. + +``` +== Requesting table ['spark_catalog', 'default', 'vehicles'] with schema: +vehicle_id: string +make: string +model: string +colour: string +fuel_type: string +cylinder_capacity: string +first_use_date: string + +== Requesting table ['spark_catalog', 'default', 'tests'] with schema: +test_id: string +vehicle_id: string +test_date: string +test_class: string +test_type: string +test_result: string +test_mileage: string +postcode_area: string + + colour test_result +0 WHITE P +1 WHITE F +2 BLACK P +3 BLACK P +4 RED P +5 BLACK P +6 BLUE P +7 SILVER F +8 SILVER F +9 BLACK P +``` + +# Summary + +The Substrait intermediate representation of the query can be serialized via the protobuf format and transferred between engines of the same type or between different engines. + +In the case of Spark for example, identical plans can be created with the Spark SQL or the Spark Dataset API. +*Separately* this file can be loaded and the Substrait Plan converted to a Spark Plan. Assuming that the consuming engine has the same understanding of the reference to LocalFiles the plan can be read and executed. + +Logical references to a 'table' via a `NamedScan` gives more flexibility; but the structure of the reference still needs to be properly understood and agreed upon. + +Once common understanding is agreed upon, transferring plans between engines brings great flexibility and potential. diff --git a/examples/substrait-spark/build.gradle.kts b/examples/substrait-spark/build.gradle.kts new file mode 100644 index 000000000..212f2b11c --- /dev/null +++ b/examples/substrait-spark/build.gradle.kts @@ -0,0 +1,40 @@ +plugins { + // Apply the application plugin to add support for building a CLI application in Java. + id("java") + id("com.diffplug.spotless") version "6.19.0" +} + +repositories { + // Use Maven Central for resolving dependencies. + mavenCentral() +} + +dependencies { + implementation("org.apache.spark:spark-core_2.12:3.5.1") + implementation("io.substrait:spark:0.36.0") + implementation("io.substrait:core:0.36.0") + implementation("org.apache.spark:spark-sql_2.12:3.5.1") + + // For a real Spark application, these would not be required since they would be in the Spark + // server classpath + runtimeOnly("org.apache.spark:spark-core_2.12:3.5.1") + runtimeOnly("org.apache.spark:spark-hive_2.12:3.5.1") +} + +tasks.jar { + isZip64 = true + exclude("META-INF/*.RSA") + exclude("META-INF/*.SF") + exclude("META-INF/*.DSA") + + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + manifest.attributes["Main-Class"] = "io.substrait.examples.App" + from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) })) +} + +tasks.named("test") { + // Use JUnit Platform for unit tests. + useJUnitPlatform() +} + +java { toolchain { languageVersion.set(JavaLanguageVersion.of(17)) } } diff --git a/examples/substrait-spark/docker-compose.yaml b/examples/substrait-spark/docker-compose.yaml new file mode 100644 index 000000000..15252983e --- /dev/null +++ b/examples/substrait-spark/docker-compose.yaml @@ -0,0 +1,32 @@ +services: + spark: + image: docker.io/bitnami/spark:3.5 + user: ":${MY_GID}" + environment: + - SPARK_MODE=master + - SPARK_RPC_AUTHENTICATION_ENABLED=no + - SPARK_RPC_ENCRYPTION_ENABLED=no + - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no + - SPARK_SSL_ENABLED=no + - SPARK_USER=spark + ports: + - '8080:8080' + volumes: + - ./_apps:/opt/spark-apps + - ./_data:/opt/spark-data + spark-worker: + image: docker.io/bitnami/spark:3.5 + user: ":${MY_GID}" + environment: + - SPARK_MODE=worker + - SPARK_MASTER_URL=spark://spark:7077 + - SPARK_WORKER_MEMORY=1G + - SPARK_WORKER_CORES=1 + - SPARK_RPC_AUTHENTICATION_ENABLED=no + - SPARK_RPC_ENCRYPTION_ENABLED=no + - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no + - SPARK_SSL_ENABLED=no + - SPARK_USER=spark + volumes: + - ./_apps:/opt/spark-apps + - ./_data:/opt/spark-data diff --git a/examples/substrait-spark/justfile b/examples/substrait-spark/justfile new file mode 100644 index 000000000..37ed425cf --- /dev/null +++ b/examples/substrait-spark/justfile @@ -0,0 +1,60 @@ +# Main justfile to run all the development scripts +# To install 'just' see https://github.com/casey/just#installation + +# Ensure all properties are exported as shell env-vars +set export +set dotenv-load + +# set the current directory, and the location of the test dats +CWDIR := justfile_directory() + +SPARK_VERSION := "3.5.1" + +SPARK_MASTER_CONTAINER := "substrait-spark-spark-1" + +_default: + @just -f {{justfile()}} --list + +# Builds the application into a JAR file +buildapp: + #!/bin/bash + set -e -o pipefail + + ${CWDIR}/../../gradlew build + + # need to let the SPARK user be able to write to the _data mount + mkdir -p ${CWDIR}/_data && chmod g+w ${CWDIR}/_data + mkdir -p ${CWDIR}/_apps + + cp ${CWDIR}/build/libs/substrait-spark*.jar ${CWDIR}/_apps/app.jar + cp ${CWDIR}/src/main/resources/*.csv ${CWDIR}/_data + +# Runs a Spark dataset api query and produces a Substrait plan +dataset: + #!/bin/bash + set -e -o pipefail + + docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkDataset" + +# Runs a Spark SQL api query and produces a Substrait plan +sql: + #!/bin/bash + set -e -o pipefail + + docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkSQL" + +# Consumes the Substrait plan file passed as the argument +consume arg: + #!/bin/bash + set -e -o pipefail + + docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkConsumeSubstrait {{arg}}" + +# Starts a simple Spark cluster locally in docker +spark: + #!/bin/bash + set -e -o pipefail + + export MY_UID=$(id -u) + export MY_GID=$(id -g) + docker compose up diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/App.java b/examples/substrait-spark/src/main/java/io/substrait/examples/App.java new file mode 100644 index 000000000..87d7190f4 --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/App.java @@ -0,0 +1,45 @@ +package io.substrait.examples; + +/** Main class */ +public final class App { + + /** Implemented by all examples */ + public interface Action { + + /** + * Run + * + * @param arg argument + */ + void run(String arg); + } + + private App() {} + + /** + * Traditional main method + * + * @param args string[] + */ + public static void main(String args[]) { + try { + + if (args.length == 0) { + args = new String[] {"SparkDataset"}; + } + String exampleClass = args[0]; + + var clz = Class.forName(App.class.getPackageName() + "." + exampleClass); + var action = (Action) clz.getDeclaredConstructor().newInstance(); + + if (args.length == 2) { + action.run(args[1]); + } else { + action.run(null); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java new file mode 100644 index 000000000..26c15274f --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java @@ -0,0 +1,46 @@ +package io.substrait.examples; + +import static io.substrait.examples.SparkHelper.ROOT_DIR; + +import io.substrait.examples.util.SubstraitStringify; +import io.substrait.plan.Plan; +import io.substrait.plan.ProtoPlanConverter; +import io.substrait.spark.logical.ToLogicalPlan; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; + +/** Minimal Spark application */ +public class SparkConsumeSubstrait implements App.Action { + + @Override + public void run(String arg) { + + // Connect to a local in-process Spark instance + try (SparkSession spark = SparkHelper.connectLocalSpark()) { + + System.out.println("Reading from " + arg); + byte[] buffer = Files.readAllBytes(Paths.get(ROOT_DIR, arg)); + + io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer); + ProtoPlanConverter protoToPlan = new ProtoPlanConverter(); + Plan plan = protoToPlan.from(proto); + + SubstraitStringify.explain(plan).forEach(System.out::println); + + ToLogicalPlan substraitConverter = new ToLogicalPlan(spark); + LogicalPlan sparkPlan = substraitConverter.convert(plan); + + System.out.println(sparkPlan); + + Dataset.ofRows(spark, sparkPlan).show(); + + spark.stop(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/SparkDataset.java b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkDataset.java new file mode 100644 index 000000000..81de54b0b --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkDataset.java @@ -0,0 +1,85 @@ +package io.substrait.examples; + +import static io.substrait.examples.SparkHelper.ROOT_DIR; +import static io.substrait.examples.SparkHelper.TESTS_CSV; +import static io.substrait.examples.SparkHelper.VEHICLES_CSV; + +import io.substrait.examples.util.SubstraitStringify; +import io.substrait.plan.PlanProtoConverter; +import io.substrait.spark.logical.ToSubstraitRel; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; + +/** Minimal Spark application */ +public class SparkDataset implements App.Action { + + @Override + public void run(String arg) { + + // Connect to a local in-process Spark instance + try (SparkSession spark = SparkHelper.connectLocalSpark()) { + + Dataset dsVehicles; + Dataset dsTests; + + // load from CSV files + String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString(); + String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString(); + + System.out.println("Reading " + vehiclesFile); + System.out.println("Reading " + testsFile); + + dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile); + dsVehicles.show(); + + dsTests = spark.read().option("delimiter", ",").option("header", "true").csv(testsFile); + dsTests.show(); + + // created the joined dataset + Dataset joinedDs = + dsVehicles + .join(dsTests, dsVehicles.col("vehicle_id").equalTo(dsTests.col("vehicle_id"))) + .filter(dsTests.col("test_result").equalTo("P")) + .groupBy(dsVehicles.col("colour")) + .count(); + + joinedDs = joinedDs.orderBy(joinedDs.col("count")); + joinedDs.show(); + + LogicalPlan plan = joinedDs.queryExecution().optimizedPlan(); + + System.out.println(plan); + createSubstrait(plan); + + spark.stop(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + } + + /** + * Create substrait plan and save to file based on logical plan + * + * @param enginePlan logical plan + */ + public void createSubstrait(LogicalPlan enginePlan) { + ToSubstraitRel toSubstrait = new ToSubstraitRel(); + io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan); + + SubstraitStringify.explain(plan).forEach(System.out::println); + + PlanProtoConverter planToProto = new PlanProtoConverter(); + byte[] buffer = planToProto.toProto(plan).toByteArray(); + try { + Files.write(Paths.get(ROOT_DIR, "spark_dataset_substrait.plan"), buffer); + System.out.println("File written to " + Paths.get(ROOT_DIR, "spark_sql_substrait.plan")); + } catch (IOException e) { + e.printStackTrace(System.out); + } + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/SparkHelper.java b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkHelper.java new file mode 100644 index 000000000..4f77a9c1c --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkHelper.java @@ -0,0 +1,38 @@ +package io.substrait.examples; + +import org.apache.spark.sql.SparkSession; + +/** Collection of helper fns */ +public final class SparkHelper { + + private SparkHelper() {} + + /** Vehicles table */ + public static final String VEHICLE_TABLE = "vehicles"; + + /** Tests table (the vehicle safety tests) */ + public static final String TESTS_TABLE = "tests"; + + /** Source data - csv */ + public static final String VEHICLES_CSV = "vehicles_subset_2023.csv"; + + /** Source data - csv */ + public static final String TESTS_CSV = "tests_subset_2023.csv"; + + /** In-container data location */ + public static final String ROOT_DIR = "/opt/spark-data"; + + /** + * Connects to the local spark cluister + * + * @return SparkSession + */ + public static SparkSession connectLocalSpark() { + + SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate(); + + spark.sparkContext().setLogLevel("ERROR"); + + return spark; + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/SparkSQL.java b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkSQL.java new file mode 100644 index 000000000..fc4c8fec4 --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/SparkSQL.java @@ -0,0 +1,93 @@ +package io.substrait.examples; + +import static io.substrait.examples.SparkHelper.ROOT_DIR; +import static io.substrait.examples.SparkHelper.TESTS_CSV; +import static io.substrait.examples.SparkHelper.TESTS_TABLE; +import static io.substrait.examples.SparkHelper.VEHICLES_CSV; +import static io.substrait.examples.SparkHelper.VEHICLE_TABLE; + +import io.substrait.examples.util.SubstraitStringify; +import io.substrait.plan.PlanProtoConverter; +import io.substrait.spark.logical.ToSubstraitRel; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; + +/** Minimal Spark application */ +public class SparkSQL implements App.Action { + + @Override + public void run(String arg) { + + // Connect to a local in-process Spark instance + try (SparkSession spark = SparkHelper.connectLocalSpark()) { + spark.catalog().listDatabases().show(); + + // load from CSV files + String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString(); + String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString(); + + System.out.println("Reading " + vehiclesFile); + System.out.println("Reading " + testsFile); + + spark + .read() + .option("delimiter", ",") + .option("header", "true") + .csv(vehiclesFile) + .createOrReplaceTempView(VEHICLE_TABLE); + spark + .read() + .option("delimiter", ",") + .option("header", "true") + .csv(testsFile) + .createOrReplaceTempView(TESTS_TABLE); + + String sqlQuery = + "SELECT vehicles.colour, count(*) as colourcount" + + " FROM vehicles" + + " INNER JOIN tests ON vehicles.vehicle_id=tests.vehicle_id" + + " WHERE tests.test_result = 'P'" + + " GROUP BY vehicles.colour" + + " ORDER BY count(*)"; + + var result = spark.sql(sqlQuery); + result.show(); + + LogicalPlan logical = result.logicalPlan(); + System.out.println(logical); + + LogicalPlan optimised = result.queryExecution().optimizedPlan(); + System.out.println(optimised); + + createSubstrait(optimised); + spark.stop(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + } + + /** + * creates a substrait plan based on the logical plan + * + * @param enginePlan Spark Local PLan + */ + public void createSubstrait(LogicalPlan enginePlan) { + ToSubstraitRel toSubstrait = new ToSubstraitRel(); + io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan); + + SubstraitStringify.explain(plan).forEach(System.out::println); + + PlanProtoConverter planToProto = new PlanProtoConverter(); + byte[] buffer = planToProto.toProto(plan).toByteArray(); + try { + Files.write(Paths.get(ROOT_DIR, "spark_sql_substrait.plan"), buffer); + System.out.println("File written to " + Paths.get(ROOT_DIR, "spark_sql_substrait.plan")); + + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/util/ExpressionStringify.java b/examples/substrait-spark/src/main/java/io/substrait/examples/util/ExpressionStringify.java new file mode 100644 index 000000000..e8630200e --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/util/ExpressionStringify.java @@ -0,0 +1,276 @@ +package io.substrait.examples.util; + +import io.substrait.expression.Expression.BinaryLiteral; +import io.substrait.expression.Expression.BoolLiteral; +import io.substrait.expression.Expression.Cast; +import io.substrait.expression.Expression.DateLiteral; +import io.substrait.expression.Expression.DecimalLiteral; +import io.substrait.expression.Expression.EmptyListLiteral; +import io.substrait.expression.Expression.FP32Literal; +import io.substrait.expression.Expression.FP64Literal; +import io.substrait.expression.Expression.FixedBinaryLiteral; +import io.substrait.expression.Expression.FixedCharLiteral; +import io.substrait.expression.Expression.I16Literal; +import io.substrait.expression.Expression.I32Literal; +import io.substrait.expression.Expression.I64Literal; +import io.substrait.expression.Expression.I8Literal; +import io.substrait.expression.Expression.IfThen; +import io.substrait.expression.Expression.InPredicate; +import io.substrait.expression.Expression.IntervalDayLiteral; +import io.substrait.expression.Expression.IntervalYearLiteral; +import io.substrait.expression.Expression.ListLiteral; +import io.substrait.expression.Expression.MapLiteral; +import io.substrait.expression.Expression.MultiOrList; +import io.substrait.expression.Expression.NullLiteral; +import io.substrait.expression.Expression.ScalarFunctionInvocation; +import io.substrait.expression.Expression.ScalarSubquery; +import io.substrait.expression.Expression.SetPredicate; +import io.substrait.expression.Expression.SingleOrList; +import io.substrait.expression.Expression.StrLiteral; +import io.substrait.expression.Expression.StructLiteral; +import io.substrait.expression.Expression.Switch; +import io.substrait.expression.Expression.TimeLiteral; +import io.substrait.expression.Expression.TimestampLiteral; +import io.substrait.expression.Expression.TimestampTZLiteral; +import io.substrait.expression.Expression.UUIDLiteral; +import io.substrait.expression.Expression.UserDefinedLiteral; +import io.substrait.expression.Expression.VarCharLiteral; +import io.substrait.expression.Expression.WindowFunctionInvocation; +import io.substrait.expression.ExpressionVisitor; +import io.substrait.expression.FieldReference; + +/** ExpressionStringify gives a simple debug text output for Expressions */ +public class ExpressionStringify extends ParentStringify + implements ExpressionVisitor { + + public ExpressionStringify(int indent) { + super(indent); + } + + @Override + public String visit(NullLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(BoolLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(I8Literal expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(I16Literal expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(I32Literal expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(I64Literal expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(FP32Literal expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(FP64Literal expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(StrLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(BinaryLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(TimeLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(DateLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(TimestampLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(TimestampTZLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(IntervalYearLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(IntervalDayLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(UUIDLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(FixedCharLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(VarCharLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(FixedBinaryLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(DecimalLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(MapLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(ListLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(EmptyListLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(StructLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(UserDefinedLiteral expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(Switch expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(IfThen expr) throws RuntimeException { + return ""; + } + + @Override + public String visit(ScalarFunctionInvocation expr) throws RuntimeException { + var sb = new StringBuilder(""); + + sb.append(expr.declaration()); + // sb.append(" ("); + var args = expr.arguments(); + for (var i = 0; i < args.size(); i++) { + var arg = args.get(i); + sb.append(getContinuationIndentString()); + sb.append("arg" + i + " = "); + var funcArgVisitor = new FunctionArgStringify(indent); + + sb.append(arg.accept(expr.declaration(), i, funcArgVisitor)); + sb.append(" "); + } + return sb.toString(); + } + + @Override + public String visit(WindowFunctionInvocation expr) throws RuntimeException { + var sb = new StringBuilder("WindowFunctionInvocation#"); + + return sb.toString(); + } + + @Override + public String visit(Cast expr) throws RuntimeException { + var sb = new StringBuilder(""); + return sb.toString(); + } + + @Override + public String visit(SingleOrList expr) throws RuntimeException { + var sb = new StringBuilder("SingleOrList#"); + + return sb.toString(); + } + + @Override + public String visit(MultiOrList expr) throws RuntimeException { + var sb = new StringBuilder("Cast#"); + + return sb.toString(); + } + + @Override + public String visit(FieldReference expr) throws RuntimeException { + StringBuilder sb = new StringBuilder("FieldRef#"); + var type = expr.getType(); + // sb.append(expr.inputExpression()); + sb.append("/").append(type.accept(new TypeStringify(indent))).append("/"); + expr.segments() + .forEach( + s -> { + sb.append(s).append(" "); + }); + + return sb.toString(); + } + + @Override + public String visit(SetPredicate expr) throws RuntimeException { + var sb = new StringBuilder("SetPredicate#"); + + return sb.toString(); + } + + @Override + public String visit(ScalarSubquery expr) throws RuntimeException { + var sb = new StringBuilder("ScalarSubquery#"); + + return sb.toString(); + } + + @Override + public String visit(InPredicate expr) throws RuntimeException { + var sb = new StringBuilder("InPredicate#"); + + return sb.toString(); + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/util/FunctionArgStringify.java b/examples/substrait-spark/src/main/java/io/substrait/examples/util/FunctionArgStringify.java new file mode 100644 index 000000000..bcaf6dc1e --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/util/FunctionArgStringify.java @@ -0,0 +1,31 @@ +package io.substrait.examples.util; + +import io.substrait.expression.EnumArg; +import io.substrait.expression.Expression; +import io.substrait.expression.FunctionArg.FuncArgVisitor; +import io.substrait.extension.SimpleExtension.Function; +import io.substrait.type.Type; + +/** FunctionArgStringify produces a simple debug string for Function Arguments */ +public class FunctionArgStringify extends ParentStringify + implements FuncArgVisitor { + + public FunctionArgStringify(int indent) { + super(indent); + } + + @Override + public String visitExpr(Function fnDef, int argIdx, Expression e) throws RuntimeException { + return e.accept(new ExpressionStringify(indent + 1)); + } + + @Override + public String visitType(Function fnDef, int argIdx, Type t) throws RuntimeException { + return t.accept(new TypeStringify(indent)); + } + + @Override + public String visitEnumArg(Function fnDef, int argIdx, EnumArg e) throws RuntimeException { + return e.toString(); + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/util/ParentStringify.java b/examples/substrait-spark/src/main/java/io/substrait/examples/util/ParentStringify.java new file mode 100644 index 000000000..30cc30a68 --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/util/ParentStringify.java @@ -0,0 +1,57 @@ +package io.substrait.examples.util; + +/** + * Parent class of all stringifiers Created as it seemed there could be an optimization to share + * formatting fns between the various stringifiers + */ +public class ParentStringify { + + protected String indentChar = " "; + protected int indent = 0; + protected int indentSize = 3; + + /** + * Build with a specific indent at the start - note 'an indent' is set by default to be 3 spaces. + * + * @param indent number of indentes + */ + public ParentStringify(int indent) { + this.indent = indent; + } + + StringBuilder getIndent() { + + var sb = new StringBuilder(); + if (indent != 0) { + sb.append("\n"); + } + sb.append(getIndentString()); + + indent++; + return sb; + } + + StringBuilder getIndentString() { + + var sb = new StringBuilder(); + sb.append(indentChar.repeat(this.indent * this.indentSize)); + sb.append("+- "); + return sb; + } + + StringBuilder getContinuationIndentString() { + + var sb = new StringBuilder(); + if (indent != 0) { + sb.append("\n"); + } + sb.append(indentChar.repeat(this.indent * this.indentSize)); + sb.append(" : "); + return sb; + } + + protected String getOutdent(StringBuilder sb) { + indent--; + return (sb).toString(); + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/util/SubstraitStringify.java b/examples/substrait-spark/src/main/java/io/substrait/examples/util/SubstraitStringify.java new file mode 100644 index 000000000..f650e8f62 --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/util/SubstraitStringify.java @@ -0,0 +1,339 @@ +package io.substrait.examples.util; + +import io.substrait.relation.Aggregate; +import io.substrait.relation.ConsistentPartitionWindow; +import io.substrait.relation.Cross; +import io.substrait.relation.EmptyScan; +import io.substrait.relation.ExtensionLeaf; +import io.substrait.relation.ExtensionMulti; +import io.substrait.relation.ExtensionSingle; +import io.substrait.relation.ExtensionTable; +import io.substrait.relation.Fetch; +import io.substrait.relation.Filter; +import io.substrait.relation.Join; +import io.substrait.relation.LocalFiles; +import io.substrait.relation.NamedScan; +import io.substrait.relation.Project; +import io.substrait.relation.Rel; +import io.substrait.relation.RelVisitor; +import io.substrait.relation.Set; +import io.substrait.relation.Sort; +import io.substrait.relation.VirtualTableScan; +import io.substrait.relation.physical.HashJoin; +import io.substrait.relation.physical.MergeJoin; +import io.substrait.relation.physical.NestedLoopJoin; +import io.substrait.type.NamedStruct; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * SubstraitStringify produces a string format output of the Substrait plan or relation + * + *

This is intended for debug and development purposes only, and follows a similar style to the + * `explain` API in libraries suck as Spark Calcite etc. + * + *

Usage: + * + *

+ * io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);
+ * SubstraitStringify.explain(plan).forEach(System.out::println);
+ * 
+ * + * There is scope for improving this output; there are some gaps in the lesser used relations This + * is not a replacement for any canoncial form and is only for ease of debugging + * + *

TODO: https://github.com/substrait-io/substrait-java/issues/302 which tracks the full + * implementation of this + */ +public class SubstraitStringify extends ParentStringify + implements RelVisitor { + + public SubstraitStringify() { + super(0); + } + + /** + * Explains the Sustrait plan + * + * @param plan Subsrait plan + * @return List of strings; typically these would then be logged or sent to stdout + */ + public static List explain(io.substrait.plan.Plan plan) { + var explanations = new ArrayList(); + explanations.add(""); + + plan.getRoots() + .forEach( + root -> { + var rel = root.getInput(); + + explanations.add("Root:: " + rel.getClass().getSimpleName() + " " + root.getNames()); + explanations.addAll(explain(rel)); + }); + + return explanations; + } + + /** + * Explains the Sustrait relation + * + * @param plan Subsrait relation + * @return List of strings; typically these would then be logged or sent to stdout + */ + public static List explain(io.substrait.relation.Rel rel) { + var s = new SubstraitStringify(); + + List explanation = new ArrayList(); + explanation.add(""); + explanation.addAll(Arrays.asList(rel.accept(s).split("\n"))); + return explanation; + } + + private boolean showRemap = false; + + private List fieldList(List fields) { + return fields.stream().map(t -> t.accept(new TypeStringify(0))).collect(Collectors.toList()); + } + + private String getRemap(Rel rel) { + if (!showRemap) { + return ""; + } + var fieldCount = rel.getRecordType().fields().size(); + var remap = rel.getRemap(); + var recordType = fieldList(rel.getRecordType().fields()); + + if (remap.isPresent()) { + return "/Remapping fields (" + + fieldCount + + ") " + + remap.get().indices() + + " as " + + recordType + + "/ "; + } else { + return "/No Remap (" + fieldCount + ") " + recordType + "/ "; + } + } + + @Override + public String visit(Aggregate aggregate) throws RuntimeException { + StringBuilder sb = getIndent().append("Aggregate:: ").append(getRemap(aggregate)); + aggregate + .getGroupings() + .forEach( + g -> { + g.getExpressions() + .forEach( + expr -> { + sb.append(expr.accept(new ExpressionStringify(this.indent))); + }); + }); + aggregate + .getInputs() + .forEach( + s -> { + sb.append(s.accept(this)); + }); + aggregate.getRemap().ifPresent(s -> sb.append(s.toString())); + + return getOutdent(sb); + } + + @Override + public String visit(EmptyScan emptyScan) throws RuntimeException { + var sb = new StringBuilder("EmptyScan:: ").append(getRemap(emptyScan)); + // sb.append(emptyScan.accept(this)); + return getOutdent(sb); + } + + @Override + public String visit(Fetch fetch) throws RuntimeException { + var sb = new StringBuilder("Fetch:: "); + // sb.append(fetch.accept(this)); + return getOutdent(sb); + } + + @Override + public String visit(Filter filter) throws RuntimeException { + var sb = getIndent().append("Filter:: ").append(getRemap(filter)); + // .append("{ "); + sb.append(filter.getCondition().accept(new ExpressionStringify(indent))) /* .append(")") */; + filter + .getInputs() + .forEach( + i -> { + sb.append(i.accept(this)); + }); + + return getOutdent(sb); + } + + @Override + public String visit(Join join) throws RuntimeException { + + var sb = + getIndent().append("Join:: ").append(join.getJoinType()).append(" ").append(getRemap(join)); + + if (join.getCondition().isPresent()) { + sb.append(join.getCondition().get().accept(new ExpressionStringify(indent))); + } + + sb.append(join.getLeft().accept(this)); + sb.append(join.getRight().accept(this)); + + return getOutdent(sb); + } + + @Override + public String visit(Set set) throws RuntimeException { + StringBuilder sb = getIndent().append("Set:: "); + return getOutdent(sb); + } + + @Override + public String visit(NamedScan namedScan) throws RuntimeException { + + StringBuilder sb = getIndent().append("NamedScan:: ").append(getRemap(namedScan)); + namedScan + .getInputs() + .forEach( + i -> { + sb.append(i.accept(this)); + }); + sb.append(" Tables="); + sb.append(namedScan.getNames()); + sb.append(" Fields="); + sb.append(namedStruct(namedScan.getInitialSchema())); + return getOutdent(sb); + } + + private String namedStruct(NamedStruct struct) { + var sb = new StringBuilder(); + + var names = struct.names(); + var types = fieldList(struct.struct().fields()); + + for (var x = 0; x < names.size(); x++) { + if (x != 0) { + sb.append(","); + } + sb.append(names.get(x)).append("[").append(types.get(x)).append("]"); + } + + return sb.toString(); + } + + @Override + public String visit(LocalFiles localFiles) throws RuntimeException { + StringBuilder sb = getIndent().append("LocalFiles:: "); + + for (var i : localFiles.getItems()) { + sb.append(getContinuationIndentString()); + var fileFormat = ""; + if (i.getFileFormat().isPresent()) { + fileFormat = i.getFileFormat().get().toString(); + } + + sb.append( + String.format( + "%s %s len=%d partition=%d start=%d", + fileFormat, i.getPath().get(), i.getLength(), i.getPartitionIndex(), i.getStart())); + } + + return getOutdent(sb); + } + + @Override + public String visit(Project project) throws RuntimeException { + StringBuilder sb = getIndent().append("Project:: ").append(getRemap(project)); + + sb.append(fieldList(project.deriveRecordType().fields())); + + var inputs = project.getInputs(); + inputs.forEach( + i -> { + sb.append(i.accept(this)); + }); + return getOutdent(sb); + } + + @Override + public String visit(Sort sort) throws RuntimeException { + StringBuilder sb = getIndent().append("Sort:: ").append(getRemap(sort)); + sort.getSortFields() + .forEach( + sf -> { + var expr = new ExpressionStringify(indent); + sb.append(sf.expr().accept(expr)).append(" ").append(sf.direction()); + }); + var inputs = sort.getInputs(); + inputs.forEach( + i -> { + sb.append(i.accept(this)); + }); + return getOutdent(sb); + } + + @Override + public String visit(Cross cross) throws RuntimeException { + StringBuilder sb = getIndent().append("Cross:: "); + return getOutdent(sb); + } + + @Override + public String visit(VirtualTableScan virtualTableScan) throws RuntimeException { + StringBuilder sb = getIndent().append("VirtualTableScan:: "); + return getOutdent(sb); + } + + @Override + public String visit(ExtensionLeaf extensionLeaf) throws RuntimeException { + StringBuilder sb = getIndent().append("extensionLeaf:: "); + return getOutdent(sb); + } + + @Override + public String visit(ExtensionSingle extensionSingle) throws RuntimeException { + StringBuilder sb = getIndent().append("extensionSingle:: "); + return getOutdent(sb); + } + + @Override + public String visit(ExtensionMulti extensionMulti) throws RuntimeException { + StringBuilder sb = getIndent().append("extensionMulti:: "); + return getOutdent(sb); + } + + @Override + public String visit(ExtensionTable extensionTable) throws RuntimeException { + StringBuilder sb = getIndent().append("extensionTable:: "); + return getOutdent(sb); + } + + @Override + public String visit(HashJoin hashJoin) throws RuntimeException { + StringBuilder sb = getIndent().append("hashJoin:: "); + return getOutdent(sb); + } + + @Override + public String visit(MergeJoin mergeJoin) throws RuntimeException { + StringBuilder sb = getIndent().append("mergeJoin:: "); + return getOutdent(sb); + } + + @Override + public String visit(NestedLoopJoin nestedLoopJoin) throws RuntimeException { + StringBuilder sb = getIndent().append("nestedLoopJoin:: "); + return getOutdent(sb); + } + + @Override + public String visit(ConsistentPartitionWindow consistentPartitionWindow) throws RuntimeException { + StringBuilder sb = getIndent().append("consistentPartitionWindow:: "); + return getOutdent(sb); + } +} diff --git a/examples/substrait-spark/src/main/java/io/substrait/examples/util/TypeStringify.java b/examples/substrait-spark/src/main/java/io/substrait/examples/util/TypeStringify.java new file mode 100644 index 000000000..796e9ca6b --- /dev/null +++ b/examples/substrait-spark/src/main/java/io/substrait/examples/util/TypeStringify.java @@ -0,0 +1,175 @@ +package io.substrait.examples.util; + +import io.substrait.type.Type; +import io.substrait.type.Type.Binary; +import io.substrait.type.Type.Bool; +import io.substrait.type.Type.Date; +import io.substrait.type.Type.Decimal; +import io.substrait.type.Type.FP32; +import io.substrait.type.Type.FP64; +import io.substrait.type.Type.FixedBinary; +import io.substrait.type.Type.FixedChar; +import io.substrait.type.Type.I16; +import io.substrait.type.Type.I32; +import io.substrait.type.Type.I64; +import io.substrait.type.Type.I8; +import io.substrait.type.Type.IntervalDay; +import io.substrait.type.Type.IntervalYear; +import io.substrait.type.Type.ListType; +import io.substrait.type.Type.Map; +import io.substrait.type.Type.Str; +import io.substrait.type.Type.Struct; +import io.substrait.type.Type.Time; +import io.substrait.type.Type.Timestamp; +import io.substrait.type.Type.TimestampTZ; +import io.substrait.type.Type.UUID; +import io.substrait.type.Type.UserDefined; +import io.substrait.type.Type.VarChar; +import io.substrait.type.TypeVisitor; + +/** TypeStringify produces a simple debug string of Substrait types */ +public class TypeStringify extends ParentStringify + implements TypeVisitor { + + protected TypeStringify(int indent) { + super(indent); + } + + @Override + public String visit(I64 type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Bool type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(I8 type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(I16 type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(I32 type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(FP32 type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(FP64 type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Str type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Binary type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Date type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Time type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + @Deprecated + public String visit(TimestampTZ type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + @Deprecated + public String visit(Timestamp type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Type.PrecisionTimestamp type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Type.PrecisionTimestampTZ type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(IntervalYear type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(IntervalDay type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(UUID type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(FixedChar type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(VarChar type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(FixedBinary type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Decimal type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Struct type) throws RuntimeException { + var sb = new StringBuffer(type.getClass().getSimpleName()); + type.fields() + .forEach( + f -> { + sb.append(f.accept(this)); + }); + return sb.toString(); + } + + @Override + public String visit(ListType type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(Map type) throws RuntimeException { + return type.getClass().getSimpleName(); + } + + @Override + public String visit(UserDefined type) throws RuntimeException { + return type.getClass().getSimpleName(); + } +} diff --git a/examples/substrait-spark/src/main/resources/tests_subset_2023.csv b/examples/substrait-spark/src/main/resources/tests_subset_2023.csv new file mode 100644 index 000000000..762d53491 --- /dev/null +++ b/examples/substrait-spark/src/main/resources/tests_subset_2023.csv @@ -0,0 +1,30 @@ +test_id,vehicle_id,test_date,test_class,test_type,test_result,test_mileage,postcode_area +539514409,17113014,2023-01-09,4,NT,F,69934,PA +1122718877,986649781,2023-01-16,4,NT,F,57376,SG +1104881351,424684356,2023-03-06,4,NT,F,81853,SG +1487493049,1307056703,2023-03-07,4,NT,P,20763,SA +1107861883,130747047,2023-03-27,4,RT,P,125910,SA +472789285,777757523,2023-03-29,4,NT,P,68399,CO +1105082521,840180863,2023-04-15,4,NT,P,54240,NN +1172953135,917255260,2023-04-27,4,NT,P,60918,SM +127807783,888103385,2023-05-08,4,NT,P,112090,EH +1645970709,816803134,2023-06-03,4,NT,P,134858,RG +1355347761,919820431,2023-06-21,4,NT,P,37336,ST +1750209849,544950855,2023-06-23,4,NT,F,120034,NR +1376930435,439876988,2023-07-19,4,NT,P,109927,PO +582729949,1075446447,2023-07-19,4,NT,P,72986,SA +127953451,105663799,2023-07-31,4,NT,F,35824,ME +759291679,931759350,2023-08-07,4,NT,P,65353,DY +1629819891,335780567,2023-08-08,4,NT,PRS,103365,CF +1120026477,1153361746,2023-08-11,4,NT,P,286881,RM +1331300969,644861283,2023-08-15,4,NT,P,52173,LE +990694587,449899992,2023-08-16,4,NT,F,124891,SA +193460599,759696266,2023-08-29,4,NT,P,83554,LU +1337337679,1110416764,2023-10-09,4,NT,PRS,71093,SS +1885237527,137785384,2023-11-04,4,NT,P,88730,BH +1082642803,1291985882,2023-11-15,4,NT,PRS,160717,BA +896066743,615735063,2023-11-15,4,RT,P,107710,NR +1022666841,474362449,2023-11-20,4,NT,P,56296,HP +1010400923,1203222226,2023-12-04,4,NT,F,89255,TW +866705687,605696575,2023-12-06,4,NT,P,14674,YO +621751843,72093448,2023-12-14,4,NT,F,230280,TR diff --git a/examples/substrait-spark/src/main/resources/vehicles_subset_2023.csv b/examples/substrait-spark/src/main/resources/vehicles_subset_2023.csv new file mode 100644 index 000000000..087b54c84 --- /dev/null +++ b/examples/substrait-spark/src/main/resources/vehicles_subset_2023.csv @@ -0,0 +1,31 @@ +vehicle_id,make,model,colour,fuel_type,cylinder_capacity,first_use_date +17113014,VAUXHALL,VIVARO,BLACK,DI,1995,2011-09-29 +986649781,VAUXHALL,INSIGNIA,WHITE,DI,1956,2017-07-19 +424684356,RENAULT,GRAND SCENIC,GREY,PE,1997,2010-07-19 +1307056703,RENAULT,CLIO,BLACK,DI,1461,2014-05-30 +130747047,FORD,FOCUS,SILVER,DI,1560,2013-07-10 +777757523,HYUNDAI,I10,WHITE,PE,998,2016-05-21 +840180863,BMW,1 SERIES,WHITE,PE,2979,2016-03-11 +917255260,VAUXHALL,ASTRA,WHITE,PE,1364,2012-04-21 +888103385,FORD,GALAXY,SILVER,DI,1997,2014-09-12 +816803134,FORD,FIESTA,BLUE,PE,1299,2002-10-24 +697184031,BMW,X1,WHITE,DI,1995,2016-03-31 +919820431,TOYOTA,AURIS,BRONZE,PE,1329,2015-06-29 +544950855,VAUXHALL,ASTRA,RED,DI,1956,2012-09-17 +439876988,MINI,MINI,GREEN,PE,1598,2010-03-31 +1075446447,CITROEN,C4,RED,DI,1560,2015-10-05 +105663799,RENAULT,KADJAR,BLACK,PE,1332,2020-07-23 +931759350,FIAT,DUCATO,WHITE,DI,2199,2008-04-18 +335780567,HYUNDAI,I20,BLUE,PE,1396,2013-08-13 +1153361746,TOYOTA,PRIUS,SILVER,HY,1800,2010-06-23 +644861283,FORD,FIESTA,BLACK,PE,998,2015-09-03 +449899992,BMW,3 SERIES,GREEN,DI,2926,2006-09-30 +759696266,CITROEN,C4,BLUE,DI,1997,2011-12-19 +1110416764,CITROEN,XSARA,SILVER,DI,1997,1999-06-30 +137785384,MINI,MINI,GREY,DI,1598,2011-11-29 +1291985882,LAND ROVER,DEFENDER,BLUE,DI,2495,2002-06-12 +615735063,VOLKSWAGEN,CADDY,WHITE,DI,1598,2013-03-01 +474362449,VAUXHALL,GRANDLAND,GREY,PE,1199,2018-11-12 +1203222226,VAUXHALL,ASTRA,BLUE,PE,1598,2010-06-03 +605696575,SUZUKI,SWIFT SZ-T DUALJET MHEV CVT,RED,HY,1197,2020-12-18 +72093448,AUDI,A4,SILVER,DI,1896,2001-03-19 diff --git a/readme.md b/readme.md index 4ae53a2fc..d527584a9 100644 --- a/readme.md +++ b/readme.md @@ -33,5 +33,11 @@ SLF4J(W): Defaulting to no-operation (NOP) logger implementation SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details. ``` +## Examples + +The [examples](./examples) folder contains examples on using Substrait with Java; please check each example for specific details of the requirements and how to run. The examples are aimed to be tested within the github workflow; depending on the setup required it might be only possible to validate compilation. + +- [Substrait-Spark](./examples/subtrait-spark/README.md) Using Substrait to produce and consume plans within Apache Spark + ## Getting Involved To learn more, head over [Substrait](https://substrait.io/), our parent project and join our [community](https://substrait.io/community/) diff --git a/settings.gradle.kts b/settings.gradle.kts index 224c6b509..013449786 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,6 @@ rootProject.name = "substrait" -include("bom", "core", "isthmus", "isthmus-cli", "spark") +include("bom", "core", "isthmus", "isthmus-cli", "spark", "examples:substrait-spark") pluginManagement { plugins {