Skip to content

Commit

Permalink
[SQL] Dataset et al
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Aug 17, 2016
1 parent 175989d commit 37ae547
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 35 deletions.
2 changes: 1 addition & 1 deletion SUMMARY.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
... link:spark-sql-SQLConf.adoc[SQLConf]
... link:spark-sql-Catalog.adoc[Catalog]
... link:spark-sql-dataset.adoc[Dataset]
.... link:spark-sql-logical-plan.adoc[LogicalPlan]
.... link:spark-sql-Encoder.adoc[Encoder]
.... link:spark-sql-columns.adoc[Columns]
.... link:spark-sql-dataframe.adoc[DataFrame (Dataset[Row\])]
Expand Down Expand Up @@ -240,7 +241,6 @@
..... link:spark-sql-catalyst-vectorized-parquet-decoder.adoc[Vectorized Parquet Decoder]
..... link:spark-sql-query-plan.adoc[QueryPlan]
..... link:spark-sql-spark-plan.adoc[SparkPlan]
..... link:spark-sql-logical-plan.adoc[LogicalPlan]
..... link:spark-sql-queryplanner.adoc[QueryPlanner]
..... link:spark-sql-query-execution.adoc[QueryExecution]
..... link:spark-sql-whole-stage-codegen.adoc[Whole-Stage Code Generation]
Expand Down
6 changes: 3 additions & 3 deletions spark-sparkcontext.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,13 @@ CAUTION: FIXME Where is the method used?

NOTE: `getAllPools` is used to calculate pool names for link:spark-webui-AllStagesPage.adoc#pool-names[Stages tab in web UI] with FAIR scheduling mode used.

=== [[defaultParallelism]] Getting Default Level of Parallelism
=== [[defaultParallelism]] Computing Default Level of Parallelism

*Default level of parallelism* is the number of link:spark-rdd-partitions.adoc[partitions] when not specified explicitly by a user.
*Default level of parallelism* is the number of link:spark-rdd-partitions.adoc[partitions] in RDDs when created without specifying them explicitly by a user.

It is used for the methods like `SparkContext.parallelize`, `SparkContext.range` and `SparkContext.makeRDD` (as well as link:spark-streaming.adoc[Spark Streaming]'s `DStream.countByValue` and `DStream.countByValueAndWindow` and few other places). It is also used to instantiate link:spark-rdd-partitions.adoc#HashPartitioner[HashPartitioner] or for the minimum number of partitions in link:spark-rdd-hadooprdd.adoc[HadoopRDDs].

SparkContext queries link:spark-taskscheduler.adoc[TaskScheduler] for the default level of parallelism (refer to link:spark-taskscheduler.adoc#contract[TaskScheduler Contract]).
Internally, `defaultParallelism` relays requests for the default level of parallelism to link:spark-taskscheduler.adoc#defaultParallelism[TaskScheduler] (it is a part of its contract).

=== [[version]] Getting Spark Version

Expand Down
2 changes: 2 additions & 0 deletions spark-sql-catalyst.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

CAUTION: FIXME Review https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala[sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala].

Catalyst is a framework for manipulating trees of relational operators and expressions. It contains logical query plans that it can translate to executable code (as Spark RDDs).

Catalyst is an extensible *query plan optimizer* with link:spark-sql-catalyst-constant-folding.adoc[constant folding], link:spark-sql-predicate-pushdown.adoc[predicate pushdown], link:spark-sql-catalyst-nullability-propagation.adoc[nullability (NULL value) propagation], link:spark-sql-catalyst-vectorized-parquet-decoder.adoc[vectorized Parquet decoder] and link:spark-sql-whole-stage-codegen.adoc[whole-stage code generation] optimization techniques.

Catalyst supports both rule-based and cost-based optimization.
Expand Down
83 changes: 72 additions & 11 deletions spark-sql-dataset.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

*Dataset* is the Spark SQL API for working with structured data, i.e. records with a known schema.

Datasets are "lazy" and computations are only triggered when an action is invoked. Internally, a `Dataset` represents a link:spark-sql-logical-plan.adoc[logical plan] that describes the computation required to produce the data (for a given link:spark-sql-sparksession.adoc[Spark SQL session]).
Datasets are "lazy" and structured query expressions are only triggered when an action is invoked. Internally, a `Dataset` represents a link:spark-sql-logical-plan.adoc[logical plan] that describes the computation query required to produce the data (for a given link:spark-sql-sparksession.adoc[Spark SQL session]).

.Dataset's Internals
image::images/spark-sql-Dataset.png[align="center"]

It is fair to say that a Dataset is a result of executing an expression against data storage like files or databases. The expression can be described by a SQL query or a Scala/Java lambda function.
A Dataset is a result of executing a query expression against data storage like files or databases. The structured query expression can be described by a SQL query or a Scala/Java lambda function.

If link:spark-sql-logical-plan.adoc[LogicalPlan] is used to <<creating-instance, create a `Dataset`>>, it is link:spark-sql-sessionstate.adoc#executePlan[executed] (using the current link:spark-sql-sessionstate.adoc#executePlan[SessionState]) to create a corresponding link:spark-sql-query-execution.adoc[QueryExecution].

Dataset API comes with declarative and type-safe operators (that improves on the earlier experience of data processing using link:spark-sql-dataframe.adoc[DataFrames] that were a set of link:spark-sql-dataframe-row.adoc[Rows]).

Expand All @@ -27,6 +29,45 @@ See https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apac

`Dataset` offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to link:spark-sql-dataframe.adoc[DataFrame] makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.

[source, scala]
----
scala> spark.range(1).filter('id === 0).explain(true)
== Parsed Logical Plan ==
'Filter ('id = 0)
+- Range (0, 1, splits=8)
== Analyzed Logical Plan ==
id: bigint
Filter (id#51L = cast(0 as bigint))
+- Range (0, 1, splits=8)
== Optimized Logical Plan ==
Filter (id#51L = 0)
+- Range (0, 1, splits=8)
== Physical Plan ==
*Filter (id#51L = 0)
+- *Range (0, 1, splits=8)
scala> spark.range(1).filter(_ == 0).explain(true)
== Parsed Logical Plan ==
'TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], unresolveddeserializer(newInstance(class java.lang.Long))
+- Range (0, 1, splits=8)
== Analyzed Logical Plan ==
id: bigint
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)
== Optimized Logical Plan ==
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)
== Physical Plan ==
*Filter <function1>.apply
+- *Range (0, 1, splits=8)
----

It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using link:spark-sql-dataframe.adoc[DataFrame], regular SQL queries or even RDDs).

Using `Dataset` objects turns `DataFrames` of link:spark-sql-dataframe-row.adoc[Row] instances into a `DataFrames` of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.
Expand All @@ -47,6 +88,29 @@ The default storage level for `Datasets` is link:spark-rdd-caching.adoc[MEMORY_A

Spark 2.0 has introduced a new query model called link:spark-sql-structured-streaming.adoc[Structured Streaming] for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded data as well as streaming and unbounded with one single API.

=== [[creating-instance]] Creating Datasets

If link:spark-sql-logical-plan.adoc[LogicalPlan] is used to <<creating-instance, create a `Dataset`>>, it is link:spark-sql-sessionstate.adoc#executePlan[executed] (using the current link:spark-sql-sessionstate.adoc#executePlan[SessionState]) to create a corresponding link:spark-sql-query-execution.adoc[QueryExecution].

=== [[show]] Display Records (show methods)

CAUTION: FIXME

Internally, `show` relays to a private `showString` to do the formatting. It turns the `Dataset` into a `DataFrame` (by calling `toDF()`) and <<take, takes first `n` records>>.

=== [[take]] Taking First n Records (take method)

[source, scala]
----
take(n: Int): Array[T]
----

`take` is an action on a `Dataset` that returns a collection of `n` records.

WARNING: `take` loads all the data into the memory of the Spark application's driver process and for a large `n` could result in `OutOfMemoryError`.

Internally, `take` creates a new `Dataset` with `Limit` logical plan for `Literal` expression and the current `LogicalPlan`. It then runs the link:spark-sql-spark-plan.adoc[SparkPlan] that produces a `Array[InternalRow]` that is in turn decoded to `Array[T]` using a bounded link:spark-sql-Encoder.adoc[encoder].

=== [[join]] join

CAUTION: FIXME
Expand Down Expand Up @@ -341,26 +405,23 @@ explain(extended: Boolean): Unit

TIP: If you are serious about query debugging you could also use the link:spark-sql-query-execution.adoc#debug[Debugging Query Execution facility].

Internally, `explain` uses `SQLContext.executePlan(logicalPlan)`.
Internally, `explain` executes a `ExplainCommand` logical plan on link:spark-sql-query-execution.adoc[the logical plan of the QueryExecution of the Dataset].

[source, scala]
----
val ds = spark.range(10)
scala> ds.explain(extended = true)
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range 0, 10, 1, 8, [id#9L]
Range (0, 10, splits=8)
== Analyzed Logical Plan ==
id: bigint
Range 0, 10, 1, 8, [id#9L]
Range (0, 10, splits=8)
== Optimized Logical Plan ==
Range 0, 10, 1, 8, [id#9L]
Range (0, 10, splits=8)
== Physical Plan ==
WholeStageCodegen
: +- Range 0, 1, 8, 10, [id#9L]
*Range (0, 10, splits=8)
----

=== [[select]] select
Expand Down
20 changes: 10 additions & 10 deletions spark-sql-logical-plan.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

CAUTION: FIXME

*Logical Plan* is an abstract representation of a query (that "produces" a link:spark-sql-dataset.adoc[Dataset]).
*Logical Plan* is an abstract representation of a structured query expression (that makes for a link:spark-sql-dataset.adoc[Dataset]).

It is modelled as `LogicalPlan` abstract class which is a custom link:spark-sql-query-plan.adoc[QueryPlan].

It may or may not be *analyzed* which is to denote that the plan (including children) has gone through analysis and verification. It may also be *resolved* to a specific schema.
It can be *analyzed* which is to say that the plan (including children) has gone through analysis and verification. It may also be *resolved* to a specific schema.

`LogicalPlan` knows the size of objects that are results of SQL operators, like `join` through `Statistics` object.
`LogicalPlan` knows the size of objects that are results of query operators, like `join`, through `Statistics` object.

`LogicalPlan` knows the maximum number of rows it can compute.
`LogicalPlan` knows the maximum number of records it can compute.

=== [[specialized-logical-plans]] Specialized LogicalPlans

Expand All @@ -24,9 +24,9 @@ It may or may not be *analyzed* which is to denote that the plan (including chil

The following is a list of join types:

* INNER
* LEFT OUTER
* RIGHT OUTER
* FULL OUTER
* LEFT SEMI
* NATURAL
* `INNER`
* `LEFT OUTER`
* `RIGHT OUTER`
* `FULL OUTER`
* `LEFT SEMI`
* `NATURAL`
13 changes: 7 additions & 6 deletions spark-sql-sparksession.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ scala> nums.show

The link:spark-sql-logical-plan.adoc[LogicalPlan] is `LocalRelation` (for the input `data` collection) or `LogicalRDD` (for the input `RDD[T]`).

=== [[range]] range methods
=== [[range]] Creating Dataset With Single Long Column (range methods)

[source, scala]
----
Expand All @@ -114,22 +114,23 @@ range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
----

`range` family of methods create a link:spark-sql-dataset.adoc[Dataset] of longs.
`range` family of methods create a link:spark-sql-dataset.adoc[Dataset] of `Long` numbers.

[source, scala]
----
scala> spark.range(0, 10, 2, 5).show
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
| 4|
| 6|
| 8|
+---+
----

NOTE: The three first variants (that do not specify `numPartitions` explicitly) use link:spark-sparkcontext.adoc#defaultParallelism[SparkContext.defaultParallelism] for the number of partitions `numPartitions`.

Internally, `range` creates a new `Dataset[Long]` with `Range` link:spark-sql-logical-plan.adoc[logical plan] and `Encoders.LONG` link:spark-sql-Encoder.adoc[encoder].

=== [[emptyDataFrame]] emptyDataFrame

[source, scala]
Expand Down
6 changes: 4 additions & 2 deletions spark-taskscheduler.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ NOTE: It is currently called by link:spark-dagscheduler.adoc#initialization[DAGS
defaultParallelism(): Int
----

`defaultParallelism` calculates the default level of parallelism to use in a cluster that is a hint to sizing jobs.
`defaultParallelism` calculates the default level of parallelism to use in a Spark application as the number of partitions in RDDs and also as a hint for sizing jobs.

NOTE: It is currently called by link:spark-sparkcontext.adoc#defaultParallelism[SparkContext for its defaultParallelism].
NOTE: It is called by link:spark-sparkcontext.adoc#defaultParallelism[`SparkContext` for its `defaultParallelism`].

TIP: Read more in link:spark-taskschedulerimpl.adoc#defaultParallelism[Calculating Default Level of Parallelism (defaultParallelism method)] for the one and only implementation of the `TaskScheduler` contract -- `TaskSchedulerImpl`.

=== [[applicationId]] Calculating Application ID (applicationId method)

Expand Down
4 changes: 2 additions & 2 deletions spark-taskschedulerimpl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ It uses `checkSpeculatableTasks` method that asks `rootPool` to check for specul

CAUTION: FIXME How does Spark handle repeated results of speculative tasks since there are copies launched?

=== [[defaultParallelism]] Default Level of Parallelism
=== [[defaultParallelism]] Calculating Default Level of Parallelism (defaultParallelism method)

*Default level of parallelism* is a hint for sizing jobs.
*Default level of parallelism* is a hint for sizing jobs. It is a part of the link:spark-taskscheduler.adoc#contract[TaskScheduler contract] and link:spark-sparkcontext.adoc#defaultParallelism[used by SparkContext] to create RDDs with the right number of partitions when not specified explicitly.

`TaskSchedulerImpl` uses link:spark-scheduler-backends.adoc#defaultParallelism[SchedulerBackend.defaultParallelism()] to calculate the value, i.e. it just passes it along to a scheduler backend.

Expand Down

0 comments on commit 37ae547

Please sign in to comment.