Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6859 Update phoenix5-spark3 README with PySpark code references #92

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 155 additions & 36 deletions phoenix5-spark3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,59 @@ See the License for the specific language governing permissions and
limitations under the License.
-->

phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames,
and enables persisting DataFrames back to Phoenix.
# Phoenix5-Spark3 Connector

## Configuring Spark to use the connector
The phoenix5-spark3 plugin extends Phoenix's MapReduce support to allow Spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the we use "Plugin" on the website, but we should standardize on "Connector"

to load Phoenix tables as DataFrames,
and enables persisting DataFrames back to Phoenix.

Use the shaded connector JAR `phoenix5-spark3-shaded-6.0.0-SNAPSHOT.jar` .
Apart from the shaded connector JAR, you also need to add the hbase mapredcp libraries and the hbase configuration directory to the classpath. The final classpath should be something like
## Pre-Requisites

`/etc/hbase/conf:$(hbase mapredcp):phoenix5-spark3-shaded-6.0.0-SNAPSHOT.jar`
* Phoenix 5.1.2+
* Spark 3.0.3+

(add the exact paths as appropiate to your system)
Both the `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties need to be set the above classpath. You may add them spark-defaults.conf, or specify them on the spark-shell or spark-submit command line.
## Why not JDBC

Although Spark supports connecting directly to JDBC databases,
It’s only able to parallelize queries by partioning on a numeric column.
It also requires a known lower bound,
upper bound and partition count in order to create split queries.

In contrast, the phoenix-spark integration is able to leverage the underlying
splits provided by Phoenix in order to retrieve and save data across multiple
workers. All that’s required is a database URL and a table name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use "select statement" instead of "table name" ?

Optional SELECT columns can be given,
as well as pushdown predicates for efficient filtering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like you need tospecify the pushdown predicates.
Can you rephrase so that it's apparent that pushdown is automatic ?


The choice of which method to use to access
Phoenix comes down to each specific use case.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
This is super important, and we should have much more on this (though not necessarily in this ticket)


## Setup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this assumes that Phoenix and HBase/Spark are are both present and configured on the same nodes.
Maybe worth mentioning it ?


To setup connector add `phoenix5-spark3-shaded` JAR as
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases, you don't want to add the connector to the maven/compile classpath, it tends to cause conflicts when upgrading.

We should move this to emd of the section, and add the caveat that this is only needed for the deprecated usages.

a dependency in your Spark job like -

```xml
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix5-spark3-shaded</artifactId>
<version>${phoenix.connectors.version}</version>
</dependency>
```

Additionally, You must add the hbase mapredcp libraries and the hbase
configuration directory to the classpath.
The final classpath should be something like -

`/etc/hbase/conf:$(hbase mapredcp):phoenix5-spark3-shaded-{phoenix.connectors.version}.jar`

NOTE:

* Use the exact paths as appropiate to your system.
* Set both `spark.driver.extraClassPath` and `spark.executor.extraClassPath`
properties to the aforementioned classpath. You can add them to the
`spark-defaults.conf`,
Or specify them in the `spark-shell` or `spark-submit` command line utilities.

## Reading Phoenix Tables

Expand All @@ -39,7 +80,9 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
```

### Load as a DataFrame using the DataSourceV2 API

Scala example:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didn't touch that part, but do we still need the SparkContext import ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add comments to make it obvious that you need to use a real ZK quorum , like
//replace "phoenix-server:2181" with the real ZK quorum

```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
Expand All @@ -61,7 +104,9 @@ df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show
```

Java example:

```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -93,24 +138,50 @@ public class PhoenixSparkRead {
}
```

PySpark example:

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

ss = SparkSession.builder.appName("phoenix-test").getOrCreate()
df = ss.read.format("phoenix").option("table", "TABLE1").option("zkUrl", "phoenix-server:2181").load()

# Approach - 1
df.filter((df.COL1 == "test_row_1") | (df.ID == 1)).select(col("ID")).show()

# Approach - 2
df.createOrReplaceTempView("TABLE1_TEMP")
ss.sql("SELECT * FROM TABLE1_TEMP WHERE COL1='test_row_1' AND ID=1L").show()
```

## Saving to Phoenix

### Save DataFrames to Phoenix using DataSourceV2

The `save` is method on DataFrame allows passing in a data source type. You can use
`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to
specify which table and server to persist the DataFrame to. The column names are derived from
the DataFrame's schema field names, and must match the Phoenix column names.
The `save` is method on DataFrame allows passing in a data source type.
You can use `phoenix` for DataSourceV2 and must also pass in a
`table` and `zkUrl` parameter to specify which table and server
to persist the DataFrame to. The column names are derived from the
DataFrame's schema field names, and must match the Phoenix column names.

The `save` method also takes a `SaveMode` option, for which only `SaveMode.Append` is supported.
The `save` method also takes a `SaveMode` option,
for which only `SaveMode.Append` is supported.

Given two Phoenix tables with the following DDL:

```sql
CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);

UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (1, 'test_row_1', 1);
UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (2, 'test_row_2', 2);
```
you can load from an input table and save to an output table as a DataFrame as follows in Scala:

You can load from an input table and save to
an output table as a DataFrame as follows:

Scala example:

```scala
import org.apache.spark.SparkContext
Expand All @@ -136,7 +207,9 @@ df.write
.options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "phoenix-server:2181"))
.save()
```

Java example:

```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -172,20 +245,35 @@ public class PhoenixSparkWriteFromInputTable {
}
```

PySpark example:

```python
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName("phoenix-test").getOrCreate()

df = ss.read.format("phoenix").option("table", "INPUT_TABLE").option("zkUrl", "phoenix-server:2181").load()
df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoenix-server:2181").mode("append").save()
```

### Save from an external RDD with a schema to a Phoenix table

Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and
`zkUrl` parameters indicating which table and server to persist the DataFrame to.
Just like the previous example, you can pass in the data source type as
`phoenix` and specify the `table` and `zkUrl` parameters indicating which
table and server to persist the DataFrame to.

Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table
that you save to.
Note that the schema of the RDD must match its column data and this must match
the schema of the Phoenix table that you save to.

Given an output Phoenix table with the following DDL:

```sql
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
you can save a dataframe from an RDD as follows in Scala:

You can save a dataframe from an RDD as follows:

Scala example:

```scala
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -216,7 +304,9 @@ df.write
.mode(SaveMode.Append)
.save()
```

Java example:

```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -269,17 +359,40 @@ public class PhoenixSparkWriteFromRDDWithSchema {
}
```

PySpark example:

```python
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType

ss = SparkSession.builder.appName("phoenix-test").getOrCreate()
schema = StructType([
StructField("ID", LongType()),
StructField("COL1", StringType()),
StructField("COL2", LongType())
])
dataSet = [Row(1, "1", 1),Row(2, "2", 2), Row(3, "3", 3)]

rdd = ss.sparkContext.parallelize(data)
df = ss.createDataFrame(rdd, schema)

df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoenix-server:2181").mode("append").save()
```

## Notes

- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
* If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
* The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
as well as an optional `zkUrl` parameter for the Phoenix connection URL.
- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
* If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified.
- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver
to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:
* As of [PHOENIX-5197]("https://issues.apache.org/jira/browse/PHOENIX-5197"),
you can pass configurations from the driver
to executors as a comma-separated list against the key `phoenixConfigs`
i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:

```scala
df = spark
.sqlContext
Expand All @@ -288,26 +401,29 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
.options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.load;
```
This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`.
Note that the same property values will be used for both the driver and all executors and
these configurations are used each time a connection is made (both on the driver and executors).

This list of properties is parsed and populated into a properties map
which is passed to `DriverManager.getConnection(connString, propsMap)`.
Note that the same property values will be used for both the driver
and all executors and these configurations are used each time a connection
is made (both on the driver and executors).

## Limitations

- Basic support for column and predicate pushdown using the Data Source API
- The Data Source API does not support passing custom Phoenix settings in configuration, you must
* Basic support for column and predicate pushdown using the Data Source API
* The Data Source API does not support passing custom Phoenix settings in configuration, you must
create the DataFrame or RDD directly if you need fine-grained configuration.
- No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html)
* No support for aggregate or distinct functions [Phoenix MR](http://phoenix.apache.org/phoenix_mr.html)

## Limitations of the Spark3 connector comapred to the Spark2 Connector

- Non-uppercase column names cannot be used for mapping DataFrames. (PHOENIX-6668)
- When writing to a DataFrame, every SQL column in the table must be specified. (PHOENIX-6667)

* Non-uppercase column names cannot be used for mapping DataFrames. (PHOENIX-6668)
* When writing to a DataFrame, every SQL column in the table must be specified. (PHOENIX-6667)

## Deprecated Usages

### Load as a DataFrame directly using a Configuration object

```scala
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
Expand All @@ -329,6 +445,7 @@ df.show
```

### Load as an RDD, using a Zookeeper URL

```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
Expand All @@ -350,8 +467,10 @@ val firstCol = rdd.first()("COL1").asInstanceOf[String]

### Saving RDDs to Phoenix

`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
`saveToPhoenix` is an implicit method on RDD[Product],
or an RDD of Tuples. The data types must
correspond to the Java types Phoenix supports
[Phoenix Datatypes]("http://phoenix.apache.org/language/datatypes.html")

Given a Phoenix table with the following DDL:

Expand All @@ -373,4 +492,4 @@ sc
Seq("ID","COL1","COL2"),
zkUrl = Some("phoenix-server:2181")
)
```
```