Skip to content

Commit

Permalink
[SPARK-LLAP-135] Bump to HDP 2.6.1. (#136)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This bumps the Hive dependency to HDP 2.6.1.0 for Spark-LLAP master/branch-2.1.

## How was this patch tested?

- [x] Travis CI should passed.
- [x] Ranger test should passed.

```
[hive@hdp26-6 python]$ python spark-ranger-secure-test.py
........................
----------------------------------------------------------------------
Ran 24 tests in 3723.090s

OK
```

This closes #135 .
(cherry picked from commit c0cafa9)

Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Jun 3, 2017
1 parent 20cf7f7 commit ec62d05
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 35 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ val scalatestVersion = "2.2.6"

sparkVersion := sys.props.getOrElse("spark.version", "2.1.1")

val hadoopVersion = sys.props.getOrElse("hadoop.version", "2.7.3")
val hiveVersion = sys.props.getOrElse("hive.version", "2.1.0.2.6.0.3-8")
val hadoopVersion = sys.props.getOrElse("hadoop.version", "2.7.3.2.6.1.0-129")
val hiveVersion = sys.props.getOrElse("hive.version", "2.1.0.2.6.1.0-129")
val log4j2Version = sys.props.getOrElse("log4j2.version", "2.4.1")
val tezVersion = sys.props.getOrElse("tez.version", "0.8.4")
val thriftVersion = sys.props.getOrElse("thrift.version", "0.9.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import java.net.URI
import java.sql.{Connection, DatabaseMetaData, Driver, DriverManager, ResultSet, ResultSetMetaData,
SQLException}

import collection.JavaConverters._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.slf4j.LoggerFactory

import org.apache.hadoop.hive.llap.FieldDesc
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
import org.apache.hadoop.hive.serde2.typeinfo._
import org.slf4j.LoggerFactory

import org.apache.spark.sql.types._


Expand Down Expand Up @@ -259,43 +258,48 @@ class JDBCWrapper {

private def getCatalystType(typeInfo: TypeInfo) : DataType = {
typeInfo.getCategory match {
case Category.PRIMITIVE => getCatalystType(typeInfo.asInstanceOf[PrimitiveTypeInfo])
case Category.LIST => ArrayType(
getCatalystType(typeInfo.asInstanceOf[ListTypeInfo].getListElementTypeInfo))
case Category.MAP => MapType(
case Category.PRIMITIVE =>
getCatalystType(typeInfo.asInstanceOf[PrimitiveTypeInfo])
case Category.LIST =>
ArrayType(getCatalystType(typeInfo.asInstanceOf[ListTypeInfo].getListElementTypeInfo))
case Category.MAP =>
MapType(
getCatalystType(typeInfo.asInstanceOf[MapTypeInfo].getMapKeyTypeInfo),
getCatalystType(typeInfo.asInstanceOf[MapTypeInfo].getMapValueTypeInfo))
case Category.STRUCT => StructType(getCatalystStructFields(typeInfo.asInstanceOf[StructTypeInfo]))
case _ => throw new SQLException("Unsupported type " + typeInfo)
case Category.STRUCT =>
StructType(getCatalystStructFields(typeInfo.asInstanceOf[StructTypeInfo]))
case _ =>
throw new SQLException("Unsupported type " + typeInfo)
}
}

private def getCatalystType(primitiveTypeInfo: PrimitiveTypeInfo) : DataType = {
primitiveTypeInfo.getPrimitiveCategory match {
case PrimitiveCategory.BOOLEAN => BooleanType
case PrimitiveCategory.BYTE => ByteType
case PrimitiveCategory.SHORT => ShortType
case PrimitiveCategory.INT => IntegerType
case PrimitiveCategory.LONG => LongType
case PrimitiveCategory.FLOAT => FloatType
case PrimitiveCategory.DOUBLE => DoubleType
case PrimitiveCategory.STRING => StringType
case PrimitiveCategory.CHAR => StringType
case PrimitiveCategory.VARCHAR => StringType
case PrimitiveCategory.DATE => DateType
case PrimitiveCategory.BOOLEAN => BooleanType
case PrimitiveCategory.BYTE => ByteType
case PrimitiveCategory.SHORT => ShortType
case PrimitiveCategory.INT => IntegerType
case PrimitiveCategory.LONG => LongType
case PrimitiveCategory.FLOAT => FloatType
case PrimitiveCategory.DOUBLE => DoubleType
case PrimitiveCategory.STRING => StringType
case PrimitiveCategory.CHAR => StringType
case PrimitiveCategory.VARCHAR => StringType
case PrimitiveCategory.DATE => DateType
case PrimitiveCategory.TIMESTAMP => TimestampType
case PrimitiveCategory.BINARY => BinaryType
case PrimitiveCategory.DECIMAL => DecimalType(
case PrimitiveCategory.BINARY => BinaryType
case PrimitiveCategory.DECIMAL => DecimalType(
primitiveTypeInfo.asInstanceOf[DecimalTypeInfo].getPrecision,
primitiveTypeInfo.asInstanceOf[DecimalTypeInfo].getScale)
case _ => throw new SQLException("Unsupported type " + primitiveTypeInfo)
}
}

private def getCatalystStructFields(structTypeInfo: StructTypeInfo) : Array[StructField] = {
structTypeInfo.getAllStructFieldNames.asScala.zip(structTypeInfo.getAllStructFieldTypeInfos.asScala).map(
structTypeInfo.getAllStructFieldNames.asScala
.zip(structTypeInfo.getAllStructFieldTypeInfos.asScala).map(
{ case (fieldName, fieldType) => new StructField(fieldName, getCatalystType(fieldType)) }
).toArray
).toArray
}

private def getCatalystType(typeName: String) : DataType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package com.hortonworks.spark.sql.hive.llap

import collection.JavaConverters._
import org.apache.hadoop.hive.llap.{Schema}
import scala.collection.JavaConverters._

import org.apache.hadoop.hive.llap.Schema
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
import org.apache.hadoop.hive.serde2.typeinfo._

import org.apache.spark.sql.Row

object RowConverter {
Expand All @@ -44,14 +46,13 @@ object RowConverter {
listElement => convertValue(
listElement,
colType.asInstanceOf[ListTypeInfo].getListElementTypeInfo))
case Category.MAP => {
case Category.MAP =>
// Try LinkedHashMap to preserve order of elements - is that necessary?
var convertedMap = scala.collection.mutable.LinkedHashMap.empty[Any, Any]
var map = scala.collection.mutable.LinkedHashMap.empty[Any, Any]
value.asInstanceOf[java.util.Map[Any, Any]].asScala.foreach((tuple) =>
convertedMap(convertValue(tuple._1, colType.asInstanceOf[MapTypeInfo].getMapKeyTypeInfo)) =
map(convertValue(tuple._1, colType.asInstanceOf[MapTypeInfo].getMapKeyTypeInfo)) =
convertValue(tuple._2, colType.asInstanceOf[MapTypeInfo].getMapValueTypeInfo))
convertedMap
}
map
case Category.STRUCT =>
// Struct value is just a list of values. Convert each value based on corresponding typeinfo
Row.fromSeq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ private[spark] class LlapExternalCatalog(
val nullable = true // Hive cols nullable
val isSigned = true
val columnType =
DefaultJDBCWrapper.getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned)
DefaultJDBCWrapper.getCatalystType(
dataType, typeName, fieldSize, fieldScale, isSigned)
schema.add(columnName, columnType, nullable)
}
CatalogTable(
Expand Down

0 comments on commit ec62d05

Please sign in to comment.