Skip to content

Commit

Permalink
PHOENIX-7377 Add option not to add CF to the Spark Column name in Spa…
Browse files Browse the repository at this point in the history
…rk Connector (#134)
  • Loading branch information
rejeb authored Aug 15, 2024
1 parent 4c880e2 commit a1c298c
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 14 deletions.
11 changes: 11 additions & 0 deletions phoenix5-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,17 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
Note that the same property values will be used for both the driver and all executors and
these configurations are used each time a connection is made (both on the driver and executors).

- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377), you can pass boolean parameter to avoid mapping
non default family columns to `columnFamily.columnName` by setting the key `doNotMapColumnFamily` to `true` (default value: `false`), for ex:
```scala
df = spark
.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
.load;
```

## Limitations

- Basic support for column and predicate pushdown using the Data Source API
Expand Down
3 changes: 3 additions & 0 deletions phoenix5-spark/src/it/resources/globalSetup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID
CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
UPSERT INTO GIGANTIC_TABLE VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')

CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1 VARCHAR)
UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1')
Original file line number Diff line number Diff line change
Expand Up @@ -913,4 +913,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
df.count() shouldEqual 1
}

test("Skip column family name when converting schema") {
val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType))

val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema, doNotMapColumnFamily = true)

val expected = new StructType(List(StructField("columnName", StringType, nullable = true)).toArray)

catalystSchema shouldEqual expected
}

test("Do not skip column family name when converting schema\"") {
val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType))

val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)

val expected = new StructType(List(StructField("columFamily.columnName", StringType, nullable = true)).toArray)

catalystSchema shouldEqual expected
}

test("Can read data and map column to columnName") {
val df = spark.read.format("phoenix")
.options(
Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
"doNotMapColumnFamily" -> "true",
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load

val schema = df.schema

val expected = new StructType(List(
StructField("ID", LongType, nullable = true),
StructField("COL1", StringType, nullable = true)
).toArray)

schema shouldEqual expected
}

test("Can read data and map column to colFamily.columnName") {
val df = spark.read.format("phoenix")
.options(
Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
"doNotMapColumnFamily" -> "false",
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load

val schema = df.schema

val expected = new StructType(List(
StructField("ID", LongType, nullable = true),
StructField("DATA.COL1", StringType, nullable = true)
).toArray)

schema shouldEqual expected
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDo
private final String jdbcUrl;
private final boolean dateAsTimestamp;
private final Properties overriddenProps;
private final boolean doNotMapColumnFamily;

private StructType schema;
private Filter[] pushedFilters = new Filter[]{};
Expand All @@ -83,6 +84,7 @@ public PhoenixDataSourceReader(DataSourceOptions options) {
this.tableName = options.tableName().get();
this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
this.doNotMapColumnFamily = options.getBoolean("doNotMapColumnFamily", false);
this.overriddenProps = PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options);
setSchema();
}
Expand All @@ -94,7 +96,7 @@ private void setSchema() {
try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) {
List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, doNotMapColumnFamily);
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ StructType, TimestampType}

object SparkSchemaUtil {

def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = {
def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false): StructType = {
val structFields = columnList.map(ci => {
val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
StructField(normalizeColumnName(ci.getColumnName), structType)
val columnName = normalizeColumnName(ci.getColumnName, doNotMapColumnFamily)
StructField(columnName, structType)
})
new StructType(structFields.toArray)
}

def normalizeColumnName(columnName: String) = {
private def normalizeColumnName(columnName: String, doNotMapColumnFamily: Boolean ) = {
val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
var normalizedColumnName = ""
if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
Expand All @@ -50,14 +51,14 @@ object SparkSchemaUtil {
else {
// split by separator to get the column family and column name
val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily) tokens(1) else unescapedColumnName
}
normalizedColumnName
}


// Lookup table for Phoenix types to Spark catalyst types
def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
private def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
Expand Down
10 changes: 10 additions & 0 deletions phoenix5-spark3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,16 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
Note that the same property values will be used for both the driver and all executors and
these configurations are used each time a connection is made (both on the driver and executors).

- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377), you can pass boolean parameter to avoid mapping
non default family columns to `columnFamily.columnName` by setting the key `doNotMapColumnFamily` to `true` (default value: `false`), for ex:
```scala
df = spark
.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
.load;
```
## Limitations

- Basic support for column and predicate pushdown using the Data Source API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) {
List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, false);
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
3 changes: 3 additions & 0 deletions phoenix5-spark3/src/it/resources/globalSetup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID
CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
UPSERT INTO GIGANTIC_TABLE VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')

CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1 VARCHAR)
UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1')
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package org.apache.phoenix.spark

import org.apache.omid.tso.client.AbortException

import java.sql.DriverManager
import java.util.Date
Expand Down Expand Up @@ -916,4 +915,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
df.count() shouldEqual 1
}

test("Skip column family name when converting schema") {
val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType))

val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema, doNotMapColumnFamily = true)

val expected = new StructType(List(StructField("columnName", StringType, nullable = true)).toArray)

catalystSchema shouldEqual expected
}

test("Do not skip column family name when converting schema\"") {
val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType))

val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)

val expected = new StructType(List(StructField("columFamily.columnName", StringType, nullable = true)).toArray)

catalystSchema shouldEqual expected
}

test("Can read data and map column to columnName") {
val df = spark.read.format("phoenix")
.options(
Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
"doNotMapColumnFamily" -> "true",
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load

val schema = df.schema

val expected = new StructType(List(
StructField("ID", LongType, nullable = true),
StructField("COL1", StringType, nullable = true)
).toArray)

schema shouldEqual expected
}

test("Can read data and map column to colFamily.columnName") {
val df = spark.read.format("phoenix")
.options(
Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
"doNotMapColumnFamily" -> "false",
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load

val schema = df.schema

val expected = new StructType(List(
StructField("ID", LongType, nullable = true),
StructField("DATA.COL1", StringType, nullable = true)
).toArray)

schema shouldEqual expected
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options){
String tableName = options.get("table");
String tenant = options.get(PhoenixRuntime.TENANT_ID_ATTRIB);
boolean dateAsTimestamp = Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", Boolean.toString(false)));
boolean doNotMapColumnFamily = Boolean.parseBoolean(options.getOrDefault("doNotMapColumnFamily", Boolean.toString(false)));
Properties overriddenProps = extractPhoenixHBaseConfFromOptions(options);
if (tenant != null) {
overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenant);
Expand All @@ -75,7 +76,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options){
try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) {
List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, doNotMapColumnFamily);
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ StructType, TimestampType}

object SparkSchemaUtil {

def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = {
def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false): StructType = {
val structFields = columnList.map(ci => {
val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
StructField(normalizeColumnName(ci.getColumnName), structType)
val columnName = normalizeColumnName(ci.getColumnName, doNotMapColumnFamily)
StructField(columnName, structType)
})
new StructType(structFields.toArray)
}

def normalizeColumnName(columnName: String) = {
private def normalizeColumnName(columnName: String, doNotMapColumnFamily: Boolean ) = {
val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
var normalizedColumnName = ""
if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
Expand All @@ -50,14 +51,14 @@ object SparkSchemaUtil {
else {
// split by separator to get the column family and column name
val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily) tokens(1) else unescapedColumnName
}
normalizedColumnName
}


// Lookup table for Phoenix types to Spark catalyst types
def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
private def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
Expand Down

0 comments on commit a1c298c

Please sign in to comment.