From f79af31e23aee9f9d73a1cdfceee18e387f61b88 Mon Sep 17 00:00:00 2001 From: Abhey Rana Date: Mon, 23 Jan 2023 18:34:12 +0530 Subject: [PATCH 1/6] Update phoenix5-spark3 README with PySpark references --- phoenix5-spark3/README.md | 70 +++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 40e769ba..3f1d3a31 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -15,18 +15,39 @@ See the License for the specific language governing permissions and limitations under the License. --> -phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames, +The phoenix5-spark3 plugin extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames, and enables persisting DataFrames back to Phoenix. -## Configuring Spark to use the connector +## Pre-Requisites +* Phoenix 5.1.2+ +* Spark 3.0.3+ -Use the shaded connector JAR `phoenix5-spark3-shaded-6.0.0-SNAPSHOT.jar` . -Apart from the shaded connector JAR, you also need to add the hbase mapredcp libraries and the hbase configuration directory to the classpath. The final classpath should be something like +## Why not JDBC? -`/etc/hbase/conf:$(hbase mapredcp):phoenix5-spark3-shaded-6.0.0-SNAPSHOT.jar` +Although Spark supports connecting directly to JDBC databases, it’s only able to parallelize queries by partioning on a numeric column. It also requires a known lower bound, upper bound and partition count in order to create split queries. -(add the exact paths as appropiate to your system) -Both the `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties need to be set the above classpath. You may add them spark-defaults.conf, or specify them on the spark-shell or spark-submit command line. +In contrast, the phoenix-spark integration is able to leverage the underlying splits provided by Phoenix in order to retrieve and save data across multiple workers. All that’s required is a database URL and a table name. Optional SELECT columns can be given, as well as pushdown predicates for efficient filtering. + +The choice of which method to use to access Phoenix comes down to each specific use case. + +## Setup + +To setup connector add `phoenix5-spark3-shaded` JAR as a dependency in your Spark job like - +``` + + org.apache.phoenix + phoenix5-spark3-shaded + ${phoenix.connectors.version} + +``` + +Additionally, You must add the hbase mapredcp libraries and the hbase configuration directory to the classpath. The final classpath should be something like - + +`/etc/hbase/conf:$(hbase mapredcp):phoenix5-spark3-shaded-{phoenix.connectors.version}.jar` + +NOTE: +* Use the exact paths as appropiate to your system. +* Set both `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties to the aforementioned classpath. You can add them to the `spark-defaults.conf`, Or specify them in the `spark-shell` or `spark-submit` command line utilities. ## Reading Phoenix Tables @@ -92,6 +113,14 @@ public class PhoenixSparkRead { } } ``` +PySpark example: +```python +from pyspark.sql import SparkSession +from pyspark.sql.functions import col + +ss = SparkSession.builder.appName("phoenix-test").getOrCreate() +df = ss.read.format("phoenix").option("table", "TABLE1").option("zkUrl", "phoenix-server:2181").load() +``` ## Saving to Phoenix @@ -109,6 +138,9 @@ Given two Phoenix tables with the following DDL: ```sql CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); + +UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (1, 'test_row_1', 1); +UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (2, 'test_row_2', 2); ``` you can load from an input table and save to an output table as a DataFrame as follows in Scala: @@ -171,6 +203,14 @@ public class PhoenixSparkWriteFromInputTable { } } ``` +```python +from pyspark.sql import SparkSession + +ss = SparkSession.builder.appName("phoenix-test").getOrCreate() + +df = ss.read.format("phoenix").option("table", "INPUT_TABLE").option("zkUrl", "phoenix-server:2181").load() +df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoenix-server:2181").mode("append").save() +``` ### Save from an external RDD with a schema to a Phoenix table @@ -268,6 +308,22 @@ public class PhoenixSparkWriteFromRDDWithSchema { } } ``` +```python +from pyspark.sql import SparkSession +from pyspark.sql.types import LongType, StringType + +ss = SparkSession.builder.appName("phoenix-test").getOrCreate() +dataSet = [ + Row("Maharastra", "Mumbai", 20667655), + Row("West Bengal", "Kolkata", 14974073), + Row("Karnatka", "Bangalore", 12764935) +] + +rdd = ss.sparkContext.parallelize(data) +df = ss.createDataFrame(rdd, schema) + +df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoenix-server:2181").mode("append").save() +``` ## Notes From 553dd47d8a44fca6cda4c4b93c791861bb452637 Mon Sep 17 00:00:00 2001 From: Abhey Rana Date: Mon, 23 Jan 2023 19:02:34 +0530 Subject: [PATCH 2/6] Minor code fixes in the 'ave from an external RDD with a schema to a Phoenix table' section --- phoenix5-spark3/README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 3f1d3a31..9d2d7769 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -313,10 +313,8 @@ from pyspark.sql import SparkSession from pyspark.sql.types import LongType, StringType ss = SparkSession.builder.appName("phoenix-test").getOrCreate() -dataSet = [ - Row("Maharastra", "Mumbai", 20667655), - Row("West Bengal", "Kolkata", 14974073), - Row("Karnatka", "Bangalore", 12764935) +schema = StructType([StructField("ID", LongType()), StructField("COL1", StringType()), StructField("COL2", LongType())]) +dataSet = [Row(1, "1", 1),Row(2, "2", 2), Row(3, "3", 3) ] rdd = ss.sparkContext.parallelize(data) From 6ed80fa9fa110a9c6c5db3ea4d82e7e45abbcf8b Mon Sep 17 00:00:00 2001 From: Abhey Rana Date: Mon, 23 Jan 2023 19:16:17 +0530 Subject: [PATCH 3/6] Add missing filtering statementes --- phoenix5-spark3/README.md | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 9d2d7769..90ba3b7d 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -120,6 +120,13 @@ from pyspark.sql.functions import col ss = SparkSession.builder.appName("phoenix-test").getOrCreate() df = ss.read.format("phoenix").option("table", "TABLE1").option("zkUrl", "phoenix-server:2181").load() + +# Approach - 1 +df.filter((df.COL1 == "test_row_1") | (df.ID == 1)).select(col("ID")).show() + +# Approach - 2 +df.createOrReplaceTempView("INPUT_TABLE_TEMP") +ss.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L").show() ``` ## Saving to Phoenix @@ -142,8 +149,10 @@ CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 IN UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (1, 'test_row_1', 1); UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (2, 'test_row_2', 2); ``` -you can load from an input table and save to an output table as a DataFrame as follows in Scala: +You can load from an input table and save to an output table as a DataFrame as follows: + +Scala example: ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} @@ -203,6 +212,7 @@ public class PhoenixSparkWriteFromInputTable { } } ``` +PySpark example: ```python from pyspark.sql import SparkSession @@ -225,8 +235,10 @@ Given an output Phoenix table with the following DDL: ```sql CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` -you can save a dataframe from an RDD as follows in Scala: +You can save a dataframe from an RDD as follows: + +Scala example: ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} @@ -308,14 +320,14 @@ public class PhoenixSparkWriteFromRDDWithSchema { } } ``` +PySpark example: ```python from pyspark.sql import SparkSession from pyspark.sql.types import LongType, StringType ss = SparkSession.builder.appName("phoenix-test").getOrCreate() schema = StructType([StructField("ID", LongType()), StructField("COL1", StringType()), StructField("COL2", LongType())]) -dataSet = [Row(1, "1", 1),Row(2, "2", 2), Row(3, "3", 3) -] +dataSet = [Row(1, "1", 1),Row(2, "2", 2), Row(3, "3", 3)] rdd = ss.sparkContext.parallelize(data) df = ss.createDataFrame(rdd, schema) From 5dd6f761dc9cb22ddd26a98e96dbc838b4a16f8a Mon Sep 17 00:00:00 2001 From: Abhey Rana Date: Mon, 23 Jan 2023 19:32:26 +0530 Subject: [PATCH 4/6] Markdownn liniting fixes --- phoenix5-spark3/README.md | 65 +++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 90ba3b7d..84dd8707 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -14,17 +14,23 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +# Phoenix5-Spark3 Connector -The phoenix5-spark3 plugin extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames, -and enables persisting DataFrames back to Phoenix. +The phoenix5-spark3 plugin extends Phoenix's MapReduce support to allow Spark + to load Phoenix tables as DataFrames, + and enables persisting DataFrames back to Phoenix. ## Pre-Requisites + * Phoenix 5.1.2+ * Spark 3.0.3+ ## Why not JDBC? -Although Spark supports connecting directly to JDBC databases, it’s only able to parallelize queries by partioning on a numeric column. It also requires a known lower bound, upper bound and partition count in order to create split queries. +Although Spark supports connecting directly to JDBC databases, + It’s only able to parallelize queries by partioning on a numeric column. + It also requires a known lower bound, + upper bound and partition count in order to create split queries. In contrast, the phoenix-spark integration is able to leverage the underlying splits provided by Phoenix in order to retrieve and save data across multiple workers. All that’s required is a database URL and a table name. Optional SELECT columns can be given, as well as pushdown predicates for efficient filtering. @@ -33,7 +39,8 @@ The choice of which method to use to access Phoenix comes down to each specific ## Setup To setup connector add `phoenix5-spark3-shaded` JAR as a dependency in your Spark job like - -``` + +```xml org.apache.phoenix phoenix5-spark3-shaded @@ -46,6 +53,7 @@ Additionally, You must add the hbase mapredcp libraries and the hbase configurat `/etc/hbase/conf:$(hbase mapredcp):phoenix5-spark3-shaded-{phoenix.connectors.version}.jar` NOTE: + * Use the exact paths as appropiate to your system. * Set both `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties to the aforementioned classpath. You can add them to the `spark-defaults.conf`, Or specify them in the `spark-shell` or `spark-submit` command line utilities. @@ -60,7 +68,9 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2'); ``` ### Load as a DataFrame using the DataSourceV2 API + Scala example: + ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession} @@ -82,7 +92,9 @@ df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) .select(df("ID")) .show ``` + Java example: + ```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -113,7 +125,9 @@ public class PhoenixSparkRead { } } ``` + PySpark example: + ```python from pyspark.sql import SparkSession from pyspark.sql.functions import col @@ -125,8 +139,8 @@ df = ss.read.format("phoenix").option("table", "TABLE1").option("zkUrl", "phoeni df.filter((df.COL1 == "test_row_1") | (df.ID == 1)).select(col("ID")).show() # Approach - 2 -df.createOrReplaceTempView("INPUT_TABLE_TEMP") -ss.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L").show() +df.createOrReplaceTempView("TABLE1_TEMP") +ss.sql("SELECT * FROM TABLE1_TEMP WHERE COL1='test_row_1' AND ID=1L").show() ``` ## Saving to Phoenix @@ -153,6 +167,7 @@ UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (2, 'test_row_2', 2); You can load from an input table and save to an output table as a DataFrame as follows: Scala example: + ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} @@ -177,7 +192,9 @@ df.write .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "phoenix-server:2181")) .save() ``` + Java example: + ```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -212,7 +229,9 @@ public class PhoenixSparkWriteFromInputTable { } } ``` + PySpark example: + ```python from pyspark.sql import SparkSession @@ -236,9 +255,10 @@ Given an output Phoenix table with the following DDL: CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` -You can save a dataframe from an RDD as follows: +You can save a dataframe from an RDD as follows: Scala example: + ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} @@ -268,7 +288,9 @@ df.write .mode(SaveMode.Append) .save() ``` + Java example: + ```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -320,7 +342,9 @@ public class PhoenixSparkWriteFromRDDWithSchema { } } ``` + PySpark example: + ```python from pyspark.sql import SparkSession from pyspark.sql.types import LongType, StringType @@ -337,15 +361,16 @@ df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoe ## Notes -- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` +* If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. -- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support +* The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings, as well as an optional `zkUrl` parameter for the Phoenix connection URL. -- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set +* If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified. -- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver +* As of [PHOENIX-5197]("https://issues.apache.org/jira/browse/PHOENIX-5197"), you can pass configurations from the driver to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex: + ```scala df = spark .sqlContext @@ -354,26 +379,27 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) .load; ``` + This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`. Note that the same property values will be used for both the driver and all executors and these configurations are used each time a connection is made (both on the driver and executors). ## Limitations -- Basic support for column and predicate pushdown using the Data Source API -- The Data Source API does not support passing custom Phoenix settings in configuration, you must +* Basic support for column and predicate pushdown using the Data Source API +* The Data Source API does not support passing custom Phoenix settings in configuration, you must create the DataFrame or RDD directly if you need fine-grained configuration. -- No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html) +* No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html) ## Limitations of the Spark3 connector comapred to the Spark2 Connector -- Non-uppercase column names cannot be used for mapping DataFrames. (PHOENIX-6668) -- When writing to a DataFrame, every SQL column in the table must be specified. (PHOENIX-6667) - +* Non-uppercase column names cannot be used for mapping DataFrames. (PHOENIX-6668) +* When writing to a DataFrame, every SQL column in the table must be specified. (PHOENIX-6667) ## Deprecated Usages ### Load as a DataFrame directly using a Configuration object + ```scala import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext @@ -395,6 +421,7 @@ df.show ``` ### Load as an RDD, using a Zookeeper URL + ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext @@ -417,7 +444,7 @@ val firstCol = rdd.first()("COL1").asInstanceOf[String] ### Saving RDDs to Phoenix `saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must -correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html) +correspond to the Java types Phoenix supports ("http://phoenix.apache.org/language/datatypes.html") Given a Phoenix table with the following DDL: @@ -439,4 +466,4 @@ sc Seq("ID","COL1","COL2"), zkUrl = Some("phoenix-server:2181") ) -``` \ No newline at end of file +``` From 64bce1bc6287fd96169c2d219bf79e1f1674e38c Mon Sep 17 00:00:00 2001 From: Abhey Rana Date: Mon, 23 Jan 2023 21:51:49 +0530 Subject: [PATCH 5/6] Markdown linting fixes --- phoenix5-spark3/README.md | 53 +++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 84dd8707..4ffb85e1 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + # Phoenix5-Spark3 Connector The phoenix5-spark3 plugin extends Phoenix's MapReduce support to allow Spark @@ -25,20 +26,26 @@ The phoenix5-spark3 plugin extends Phoenix's MapReduce support to allow Spark * Phoenix 5.1.2+ * Spark 3.0.3+ -## Why not JDBC? +## Why not JDBC Although Spark supports connecting directly to JDBC databases, It’s only able to parallelize queries by partioning on a numeric column. It also requires a known lower bound, upper bound and partition count in order to create split queries. -In contrast, the phoenix-spark integration is able to leverage the underlying splits provided by Phoenix in order to retrieve and save data across multiple workers. All that’s required is a database URL and a table name. Optional SELECT columns can be given, as well as pushdown predicates for efficient filtering. +In contrast, the phoenix-spark integration is able to leverage the underlying + splits provided by Phoenix in order to retrieve and save data across multiple + workers. All that’s required is a database URL and a table name. + Optional SELECT columns can be given, + as well as pushdown predicates for efficient filtering. -The choice of which method to use to access Phoenix comes down to each specific use case. +The choice of which method to use to access + Phoenix comes down to each specific use case. ## Setup -To setup connector add `phoenix5-spark3-shaded` JAR as a dependency in your Spark job like - +To setup connector add `phoenix5-spark3-shaded` JAR as + a dependency in your Spark job like - ```xml @@ -48,7 +55,9 @@ To setup connector add `phoenix5-spark3-shaded` JAR as a dependency in your Spar ``` -Additionally, You must add the hbase mapredcp libraries and the hbase configuration directory to the classpath. The final classpath should be something like - +Additionally, You must add the hbase mapredcp libraries and the hbase + configuration directory to the classpath. + The final classpath should be something like - `/etc/hbase/conf:$(hbase mapredcp):phoenix5-spark3-shaded-{phoenix.connectors.version}.jar` @@ -147,12 +156,14 @@ ss.sql("SELECT * FROM TABLE1_TEMP WHERE COL1='test_row_1' AND ID=1L").show() ### Save DataFrames to Phoenix using DataSourceV2 -The `save` is method on DataFrame allows passing in a data source type. You can use -`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to -specify which table and server to persist the DataFrame to. The column names are derived from -the DataFrame's schema field names, and must match the Phoenix column names. +The `save` is method on DataFrame allows passing in a data source type. + You can use `phoenix` for DataSourceV2 and must also pass in a + `table` and `zkUrl` parameter to specify which table and server + to persist the DataFrame to. The column names are derived from the + DataFrame's schema field names, and must match the Phoenix column names. -The `save` method also takes a `SaveMode` option, for which only `SaveMode.Append` is supported. +The `save` method also takes a `SaveMode` option, + for which only `SaveMode.Append` is supported. Given two Phoenix tables with the following DDL: @@ -164,7 +175,8 @@ UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (1, 'test_row_1', 1); UPSERT INTO INPUT_TABLE (ID, COL1, COL2) VALUES (2, 'test_row_2', 2); ``` -You can load from an input table and save to an output table as a DataFrame as follows: +You can load from an input table and save to + an output table as a DataFrame as follows: Scala example: @@ -243,11 +255,12 @@ df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoe ### Save from an external RDD with a schema to a Phoenix table -Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and -`zkUrl` parameters indicating which table and server to persist the DataFrame to. +Just like the previous example, you can pass in the data source type as + `phoenix` and specify the `table` and `zkUrl` parameters indicating which + table and server to persist the DataFrame to. -Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table -that you save to. +Note that the schema of the RDD must match its column data and this must match + the schema of the Phoenix table that you save to. Given an output Phoenix table with the following DDL: @@ -361,7 +374,7 @@ df.write.format("phoenix").option("table", "OUTPUT_TABLE").option("zkUrl", "phoe ## Notes -* If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` +* If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. * The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings, @@ -389,7 +402,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho * Basic support for column and predicate pushdown using the Data Source API * The Data Source API does not support passing custom Phoenix settings in configuration, you must create the DataFrame or RDD directly if you need fine-grained configuration. -* No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html) +* No support for aggregate or distinct functions [Phoenix MR](http://phoenix.apache.org/phoenix_mr.html) ## Limitations of the Spark3 connector comapred to the Spark2 Connector @@ -443,8 +456,10 @@ val firstCol = rdd.first()("COL1").asInstanceOf[String] ### Saving RDDs to Phoenix -`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must -correspond to the Java types Phoenix supports ("http://phoenix.apache.org/language/datatypes.html") +`saveToPhoenix` is an implicit method on RDD[Product], + or an RDD of Tuples. The data types must + correspond to the Java types Phoenix supports + [Phoenix Datatypes]("http://phoenix.apache.org/language/datatypes.html") Given a Phoenix table with the following DDL: From 1521ce2ec823c2a2ba27d0b411f2ab5de1dced92 Mon Sep 17 00:00:00 2001 From: Abhey Rana Date: Mon, 23 Jan 2023 22:09:37 +0530 Subject: [PATCH 6/6] Markdown final fixes --- phoenix5-spark3/README.md | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 4ffb85e1..b974a4c0 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -64,7 +64,10 @@ Additionally, You must add the hbase mapredcp libraries and the hbase NOTE: * Use the exact paths as appropiate to your system. -* Set both `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties to the aforementioned classpath. You can add them to the `spark-defaults.conf`, Or specify them in the `spark-shell` or `spark-submit` command line utilities. +* Set both `spark.driver.extraClassPath` and `spark.executor.extraClassPath` + properties to the aforementioned classpath. You can add them to the + `spark-defaults.conf`, +Or specify them in the `spark-shell` or `spark-submit` command line utilities. ## Reading Phoenix Tables @@ -363,7 +366,11 @@ from pyspark.sql import SparkSession from pyspark.sql.types import LongType, StringType ss = SparkSession.builder.appName("phoenix-test").getOrCreate() -schema = StructType([StructField("ID", LongType()), StructField("COL1", StringType()), StructField("COL2", LongType())]) +schema = StructType([ + StructField("ID", LongType()), + StructField("COL1", StringType()), + StructField("COL2", LongType()) +]) dataSet = [Row(1, "1", 1),Row(2, "2", 2), Row(3, "3", 3)] rdd = ss.sparkContext.parallelize(data) @@ -381,8 +388,10 @@ optionally specifying a `conf` Hadoop configuration parameter with custom Phoeni as well as an optional `zkUrl` parameter for the Phoenix connection URL. * If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified. -* As of [PHOENIX-5197]("https://issues.apache.org/jira/browse/PHOENIX-5197"), you can pass configurations from the driver -to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex: +* As of [PHOENIX-5197]("https://issues.apache.org/jira/browse/PHOENIX-5197"), + you can pass configurations from the driver + to executors as a comma-separated list against the key `phoenixConfigs` + i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex: ```scala df = spark @@ -393,9 +402,11 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho .load; ``` - This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`. - Note that the same property values will be used for both the driver and all executors and - these configurations are used each time a connection is made (both on the driver and executors). + This list of properties is parsed and populated into a properties map + which is passed to `DriverManager.getConnection(connString, propsMap)`. + Note that the same property values will be used for both the driver + and all executors and these configurations are used each time a connection + is made (both on the driver and executors). ## Limitations