From a1c298c2dc35c2db9f0ebf8235891564e9b88d2b Mon Sep 17 00:00:00 2001 From: rejeb ben rejeb Date: Thu, 15 Aug 2024 17:50:11 +0200 Subject: [PATCH] PHOENIX-7377 Add option not to add CF to the Spark Column name in Spark Connector (#134) --- phoenix5-spark/README.md | 11 ++++ .../src/it/resources/globalSetup.sql | 3 + .../apache/phoenix/spark/PhoenixSparkIT.scala | 54 ++++++++++++++++++ .../v2/reader/PhoenixDataSourceReader.java | 4 +- .../phoenix/spark/SparkSchemaUtil.scala | 11 ++-- phoenix5-spark3/README.md | 10 ++++ .../connector/PhoenixTestingDataSource.java | 2 +- .../src/it/resources/globalSetup.sql | 3 + .../apache/phoenix/spark/PhoenixSparkIT.scala | 55 ++++++++++++++++++- .../sql/connector/PhoenixDataSource.java | 3 +- .../phoenix/spark/SparkSchemaUtil.scala | 11 ++-- 11 files changed, 153 insertions(+), 14 deletions(-) diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md index 73d68c20..3542a19d 100644 --- a/phoenix5-spark/README.md +++ b/phoenix5-spark/README.md @@ -293,6 +293,17 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho Note that the same property values will be used for both the driver and all executors and these configurations are used each time a connection is made (both on the driver and executors). +- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377), you can pass boolean parameter to avoid mapping + non default family columns to `columnFamily.columnName` by setting the key `doNotMapColumnFamily` to `true` (default value: `false`), for ex: + ```scala + df = spark + .sqlContext + .read + .format("phoenix") + .options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true")) + .load; + ``` + ## Limitations - Basic support for column and predicate pushdown using the Data Source API diff --git a/phoenix5-spark/src/it/resources/globalSetup.sql b/phoenix5-spark/src/it/resources/globalSetup.sql index 8a3a4c29..aa5a81f0 100644 --- a/phoenix5-spark/src/it/resources/globalSetup.sql +++ b/phoenix5-spark/src/it/resources/globalSetup.sql @@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100)) CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100)) UPSERT INTO GIGANTIC_TABLE VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a') + +CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1 VARCHAR) +UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1') diff --git a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 464a5881..f1a09d64 100644 --- a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -913,4 +913,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { df.count() shouldEqual 1 } + test("Skip column family name when converting schema") { + val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType)) + + val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema, doNotMapColumnFamily = true) + + val expected = new StructType(List(StructField("columnName", StringType, nullable = true)).toArray) + + catalystSchema shouldEqual expected + } + + test("Do not skip column family name when converting schema\"") { + val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType)) + + val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema) + + val expected = new StructType(List(StructField("columFamily.columnName", StringType, nullable = true)).toArray) + + catalystSchema shouldEqual expected + } + + test("Can read data and map column to columnName") { + val df = spark.read.format("phoenix") + .options( + Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"), + "doNotMapColumnFamily" -> "true", + PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + + val schema = df.schema + + val expected = new StructType(List( + StructField("ID", LongType, nullable = true), + StructField("COL1", StringType, nullable = true) + ).toArray) + + schema shouldEqual expected + } + + test("Can read data and map column to colFamily.columnName") { + val df = spark.read.format("phoenix") + .options( + Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"), + "doNotMapColumnFamily" -> "false", + PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + + val schema = df.schema + + val expected = new StructType(List( + StructField("ID", LongType, nullable = true), + StructField("DATA.COL1", StringType, nullable = true) + ).toArray) + + schema shouldEqual expected + } + } diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java index 84d83f39..22e0cfbe 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java @@ -68,6 +68,7 @@ public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDo private final String jdbcUrl; private final boolean dateAsTimestamp; private final Properties overriddenProps; + private final boolean doNotMapColumnFamily; private StructType schema; private Filter[] pushedFilters = new Filter[]{}; @@ -83,6 +84,7 @@ public PhoenixDataSourceReader(DataSourceOptions options) { this.tableName = options.tableName().get(); this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options); this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false); + this.doNotMapColumnFamily = options.getBoolean("doNotMapColumnFamily", false); this.overriddenProps = PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options); setSchema(); } @@ -94,7 +96,7 @@ private void setSchema() { try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); Seq columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); - schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, doNotMapColumnFamily); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala index 26461ace..19535bfc 100644 --- a/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala +++ b/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala @@ -33,15 +33,16 @@ StructType, TimestampType} object SparkSchemaUtil { - def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = { + def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false): StructType = { val structFields = columnList.map(ci => { val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp) - StructField(normalizeColumnName(ci.getColumnName), structType) + val columnName = normalizeColumnName(ci.getColumnName, doNotMapColumnFamily) + StructField(columnName, structType) }) new StructType(structFields.toArray) } - def normalizeColumnName(columnName: String) = { + private def normalizeColumnName(columnName: String, doNotMapColumnFamily: Boolean ) = { val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName) var normalizedColumnName = "" if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) { @@ -50,14 +51,14 @@ object SparkSchemaUtil { else { // split by separator to get the column family and column name val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX) - normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName + normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily) tokens(1) else unescapedColumnName } normalizedColumnName } // Lookup table for Phoenix types to Spark catalyst types - def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match { + private def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match { case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index ec7684ae..8954e168 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -302,6 +302,16 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho Note that the same property values will be used for both the driver and all executors and these configurations are used each time a connection is made (both on the driver and executors). +- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377), you can pass boolean parameter to avoid mapping + non default family columns to `columnFamily.columnName` by setting the key `doNotMapColumnFamily` to `true` (default value: `false`), for ex: + ```scala + df = spark + .sqlContext + .read + .format("phoenix") + .options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true")) + .load; + ``` ## Limitations - Basic support for column and predicate pushdown using the Data Source API diff --git a/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java b/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java index 4860154f..adfbe73a 100644 --- a/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java +++ b/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java @@ -48,7 +48,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); Seq columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); - schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, false); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/phoenix5-spark3/src/it/resources/globalSetup.sql b/phoenix5-spark3/src/it/resources/globalSetup.sql index 8a3a4c29..6082727e 100644 --- a/phoenix5-spark3/src/it/resources/globalSetup.sql +++ b/phoenix5-spark3/src/it/resources/globalSetup.sql @@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100)) CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100)) UPSERT INTO GIGANTIC_TABLE VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a') + +CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1 VARCHAR) +UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1') \ No newline at end of file diff --git a/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 85f590d2..7a16b005 100644 --- a/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -13,7 +13,6 @@ */ package org.apache.phoenix.spark -import org.apache.omid.tso.client.AbortException import java.sql.DriverManager import java.util.Date @@ -916,4 +915,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { df.count() shouldEqual 1 } + test("Skip column family name when converting schema") { + val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType)) + + val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema, doNotMapColumnFamily = true) + + val expected = new StructType(List(StructField("columnName", StringType, nullable = true)).toArray) + + catalystSchema shouldEqual expected + } + + test("Do not skip column family name when converting schema\"") { + val phoenixSchema = List(new ColumnInfo("columFamily.columnName", PVarchar.INSTANCE.getSqlType)) + + val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema) + + val expected = new StructType(List(StructField("columFamily.columnName", StringType, nullable = true)).toArray) + + catalystSchema shouldEqual expected + } + + test("Can read data and map column to columnName") { + val df = spark.read.format("phoenix") + .options( + Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"), + "doNotMapColumnFamily" -> "true", + PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + + val schema = df.schema + + val expected = new StructType(List( + StructField("ID", LongType, nullable = true), + StructField("COL1", StringType, nullable = true) + ).toArray) + + schema shouldEqual expected + } + + test("Can read data and map column to colFamily.columnName") { + val df = spark.read.format("phoenix") + .options( + Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"), + "doNotMapColumnFamily" -> "false", + PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + + val schema = df.schema + + val expected = new StructType(List( + StructField("ID", LongType, nullable = true), + StructField("DATA.COL1", StringType, nullable = true) + ).toArray) + + schema shouldEqual expected + } + } diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java index c3c0aded..d2a7410e 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java @@ -64,6 +64,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options){ String tableName = options.get("table"); String tenant = options.get(PhoenixRuntime.TENANT_ID_ATTRIB); boolean dateAsTimestamp = Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", Boolean.toString(false))); + boolean doNotMapColumnFamily = Boolean.parseBoolean(options.getOrDefault("doNotMapColumnFamily", Boolean.toString(false))); Properties overriddenProps = extractPhoenixHBaseConfFromOptions(options); if (tenant != null) { overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenant); @@ -75,7 +76,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options){ try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); Seq columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); - schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, doNotMapColumnFamily); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala index 26461ace..19535bfc 100644 --- a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala +++ b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala @@ -33,15 +33,16 @@ StructType, TimestampType} object SparkSchemaUtil { - def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = { + def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false): StructType = { val structFields = columnList.map(ci => { val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp) - StructField(normalizeColumnName(ci.getColumnName), structType) + val columnName = normalizeColumnName(ci.getColumnName, doNotMapColumnFamily) + StructField(columnName, structType) }) new StructType(structFields.toArray) } - def normalizeColumnName(columnName: String) = { + private def normalizeColumnName(columnName: String, doNotMapColumnFamily: Boolean ) = { val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName) var normalizedColumnName = "" if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) { @@ -50,14 +51,14 @@ object SparkSchemaUtil { else { // split by separator to get the column family and column name val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX) - normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName + normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily) tokens(1) else unescapedColumnName } normalizedColumnName } // Lookup table for Phoenix types to Spark catalyst types - def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match { + private def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match { case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType