Skip to content

Commit

Permalink
PHOENIX-6783 Add spark-sql for spark2 and spark3
Browse files Browse the repository at this point in the history
  • Loading branch information
rejeb committed Aug 18, 2024
1 parent a1c298c commit 0c19440
Show file tree
Hide file tree
Showing 57 changed files with 1,077 additions and 2,373 deletions.
193 changes: 87 additions & 106 deletions phoenix5-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ limitations under the License.
phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames,
and enables persisting DataFrames back to Phoenix.

## Configuration properties

| Name | Default | Usage | Description |
| table | empty | R/W | table name as `namespace.table_name` |
| zrUrl | empty | R/W | List of zookeeper hosts. Deprecated, use `jdbcUrl` instead |
| jdbcUrl | empty | R/W | jdbc url connection to database as `jdbc:phoenix:zkHost:zkport` |
| dateAsTimestamp | false | R | Cast Date to Timestamp |
| doNotMapColumnFamily | false | R | For non default column family. Do not prefix column with column family name |
| tenantId | empty | R/W | Define tenantId when reading from multitenant table |
| phoenixconfigs | empty | R/W | Comma seperated value of hbase/phoenix config to override. (property=value,property=value) |
| skipNormalizingIdentifier | empty | W | skip normalize identifier |

## Reading Phoenix Tables

Given a Phoenix table with the following DDL and DML:
Expand Down Expand Up @@ -45,7 +57,7 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "TABLE1"))
.options(Map("table" -> "TABLE1", "jdbcUrl" -> "jdbc:phoenix:zkHost:zkport"))
.load

df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
Expand All @@ -65,21 +77,67 @@ public class PhoenixSparkRead {
public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Load data from TABLE1
Dataset<Row> df = sqlContext
Dataset<Row> df = spark
.read()
.format("phoenix")
.option("table", "TABLE1")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.load();
df.createOrReplaceTempView("TABLE1");

SQLContext sqlCtx = new SQLContext(jsc);
df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
df = spark.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
df.show();
}
}
```

### Load as a DataFrame using SparkSql and the DataSourceV2 API
Scala example:
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}

val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
.config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()

// Load data from TABLE1
spark.sql("CREATE TABLE TABLE1_SQL USING phoenix OPTIONS ('table' 'TABLE1', 'jdbcUrl' 'jdbc:phoenix:zkHost:zkport')")

val df = spark.sql(s"SELECT ID FROM $sqlTableName where COL1='test_row_1'")

df.show

```
Java example:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class PhoenixSparkRead {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Load data from TABLE1
Dataset<Row> df = spark
.sql("CREATE TABLE TABLE1_SQL USING phoenix " +
"OPTIONS ('table' 'TABLE1', 'jdbcUrl' 'jdbc:phoenix:zkHost:zkport')");


df = spark.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
df.show();
jsc.stop();
}
}
```
Expand All @@ -89,7 +147,7 @@ public class PhoenixSparkRead {
### Save DataFrames to Phoenix using DataSourceV2

The `save` is method on DataFrame allows passing in a data source type. You can use
`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to
`phoenix` for DataSourceV2 and must also pass in a `table` and `jdbcUrl` parameter to
specify which table and server to persist the DataFrame to. The column names are derived from
the DataFrame's schema field names, and must match the Phoenix column names.

Expand Down Expand Up @@ -118,14 +176,16 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "INPUT_TABLE"))
.option("table", "INPUT_TABLE")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.load

// Save to OUTPUT_TABLE
df.write
.format("phoenix")
.mode(SaveMode.Overwrite)
.options(Map("table" -> "OUTPUT_TABLE"))
.mode(SaveMode.Append)
.option("table", "OUTPUT_TABLE")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.save()
```
Java example:
Expand All @@ -141,32 +201,32 @@ public class PhoenixSparkWriteFromInputTable {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Load INPUT_TABLE
Dataset<Row> df = sqlContext
Dataset<Row> df = spark
.read()
.format("phoenix")
.option("table", "INPUT_TABLE")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.load();

// Save to OUTPUT_TABLE
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.save();
jsc.stop();
}
}
```

### Save from an external RDD with a schema to a Phoenix table

Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and
`zkUrl` parameters indicating which table and server to persist the DataFrame to.
`jdbcUrl` parameters indicating which table and server to persist the DataFrame to.

Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table
that you save to.
Expand Down Expand Up @@ -200,11 +260,12 @@ val schema = StructType(
val rowRDD = spark.sparkContext.parallelize(dataSet)

// Apply the schema to the RDD.
val df = spark.sqlContext.createDataFrame(rowRDD, schema)
val df = spark.createDataFrame(rowRDD, schema)

df.write
.format("phoenix")
.options(Map("table" -> "OUTPUT_TABLE"))
.option("table", "OUTPUT_TABLE")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.mode(SaveMode.Overwrite)
.save()
```
Expand All @@ -230,10 +291,7 @@ public class PhoenixSparkWriteFromRDDWithSchema {
public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSession spark = sqlContext.sparkSession();
Dataset<Row> df;
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Generate the schema based on the fields
List<StructField> fields = new ArrayList<>();
Expand All @@ -249,14 +307,13 @@ public class PhoenixSparkWriteFromRDDWithSchema {
}

// Create a DataFrame from the rows and the specified schema
df = spark.createDataFrame(rows, schema);
Dataset<Row> df = spark.createDataFrame(rows, schema);
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
.option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.save();

jsc.stop();
}
}
```
Expand All @@ -269,14 +326,10 @@ the deprected `zkUrl` parameter for backwards compatibility purposes. If neither
it falls back to using connection defined by hbase-site.xml.
- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport",
while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort"
- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter,
only `"zkUrl"`
- DataSourceV1 is no longer supported as of `connectors-6.0.0`.
Use `"phoenix"` instead of `"org.apache.phoenix.spark"` datasource
- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and
`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
as well as an optional `zkUrl` parameter.
`saveToPhoenix` are removed as of `connectors-6.0.0`.

- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver
to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:
Expand All @@ -300,7 +353,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
.options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:zkHost:zkport", "doNotMapColumnFamily" -> "true"))
.load;
```

Expand All @@ -310,75 +363,3 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
- The Data Source API does not support passing custom Phoenix settings in configuration, you must
create the DataFrame or RDD directly if you need fine-grained configuration.
- No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html)

## Deprecated Usages

### Load as a DataFrame directly using a Configuration object
```scala
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._

val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'

val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
val sc = new SparkContext("local", "phoenix-test", sparkConf)
val sqlContext = new SQLContext(sc)

// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
val df = sqlContext.phoenixTableAsDataFrame(
"TABLE1", Array("ID", "COL1"), conf = configuration
)

df.show
```

### Load as an RDD, using a Zookeeper URL
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
import org.apache.spark.rdd.RDD

val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
val sc = new SparkContext("local", "phoenix-test", sparkConf)

// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
"TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
)

rdd.count()

val firstId = rdd.first()("ID").asInstanceOf[Long]
val firstCol = rdd.first()("COL1").asInstanceOf[String]
```

### Saving RDDs to Phoenix

`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)

Given a Phoenix table with the following DDL:

```sql
CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```

```scala
import org.apache.spark.SparkContext
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "phoenix-test")
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))

sc
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
Seq("ID","COL1","COL2"),
zkUrl = Some("phoenix-server:2181")
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void basicWriteAndReadBackTest() throws SQLException {

Dataset df1Read = spark.read().format("phoenix")
.option("table", tableName)
.option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();

assertEquals(3l, df1Read.count());

Expand All @@ -173,7 +173,6 @@ public void basicWriteAndReadBackTest() throws SQLException {
}

@Test
@Ignore // Spark3 seems to be unable to handle mixed case colum names
public void lowerCaseWriteTest() throws SQLException {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
Expand Down Expand Up @@ -205,7 +204,8 @@ public void lowerCaseWriteTest() throws SQLException {
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", tableName)
.option(ZOOKEEPER_URL, getUrl())
.option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true")
.option(JDBC_URL, getUrl())
.save();

try (Connection conn = DriverManager.getConnection(getUrl());
Expand Down
Loading

0 comments on commit 0c19440

Please sign in to comment.