Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming to struct type column results in nulls #255

Open
massoudm opened this issue Nov 5, 2018 · 6 comments
Open

streaming to struct type column results in nulls #255

massoudm opened this issue Nov 5, 2018 · 6 comments

Comments

@massoudm
Copy link

massoudm commented Nov 5, 2018

I'm trying to write to a table with Struct type column and all properties in that column are null after saving.
HDP 3.0.1.0-187
hive-warehouse-connector_2.11-1.0.0.3.0.1.0-187.jar

Here is code to reproduce:

CREATE TABLE `test`(
    `messageid` string,
    `tag` struct<`name`:string,`value`:int>
)
STORED AS ORC 
LOCATION
  '/user/hive/hivedata/test'
TBLPROPERTIES ("transactional"="true")

Spark:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val sparkConf = new SparkConf()
  .set("spark.sql.streaming.checkpointLocation", "./checkpoint")

val sparkSession = SparkSession.builder()
  .appName("HiveStreamingExample")
  .config(sparkConf)
  .enableHiveSupport()
  .getOrCreate()

import com.hortonworks.hwc.HiveWarehouseSession
import sparkSession.implicits._

case class Tag(name: String, value: Int)
case class Message(messageid: String, tag: Tag)

val data = List(
   Message("id_1", Tag("tag_1", 1) ),
   Message("id_2", Tag("tag_2", 2) )
)

val messages = data.toDF().as[Message]

messages.printSchema

messages
    .write
    .format(HiveWarehouseSession.DATAFRAME_TO_STREAM)
    .option("table", "test")
    .save()

messages.show(10, false)
+---------+----------+
|messageid|tag |
+---------+----------+
|id_1 |[tag_1, 1]|
|id_2 |[tag_2, 2]|
+---------+----------+

but when I query the table using hive:

in hive

SELECT * FROM test

I get:

"test.messageid","test.tag"
"id_1","{""name"":null,""value"":null}"
"id_2","{""name"":null,""value"":null}"

Since I do not see any code example doing streaming with Struct columns, I believe this may be an undetected bug.

P.S.: Same code works fine if I use HIVE_WAREHOUSE_CONNECTOR instead of DATAFRAME_TO_STREAM.

@massoudm
Copy link
Author

I have found what the problem is: HiveStreamingDataWriter uses StrictDelimitedInputWriter and does not account for Struct types.
I'm trying to implement a Json DataWriter, but cannot find a branch that builds. Can anyone point me to the right branch?

@massoudm
Copy link
Author

massoudm commented Nov 13, 2018

I managed to use master branch and get it to build.
Now my implementation of HiveStreamingJsonDataWriter throws an exception when it finishes writing the first batch and tries to create a new transaction. The issue seems to be related to assembly shading:

java.lang.ClassCastException: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat cannot be cast to shadehive.org.apache.hadoop.hive.ql.io.AcidOutputFormat
at org.apache.hive.streaming.AbstractRecordWriter.init(AbstractRecordWriter.java:164)
at org.apache.hive.streaming.HiveStreamingConnection$TransactionBatch.<init>(HiveStreamingConnection.java:669)
at org.apache.hive.streaming.HiveStreamingConnection$TransactionBatch.<init>(HiveStreamingConnection.java:596)
at org.apache.hive.streaming.HiveStreamingConnection.createNewTransactionBatch(HiveStreamingConnection.java:485)
at org.apache.hive.streaming.HiveStreamingConnection.beginNextTransaction(HiveStreamingConnection.java:478)
at org.apache.hive.streaming.HiveStreamingConnection.beginTransaction(HiveStreamingConnection.java:507)
at com.hortonworks.spark.sql.hive.llap.HiveStreamingJsonDataWriter.write(HiveStreamingJsonDataWriter.java:110)
at com.hortonworks.spark.sql.hive.llap.HiveStreamingJsonDataWriter.write(HiveStreamingJsonDataWriter.java:21)

Fix for this issue is included in HIVE-20059, so I will try to include the fix in Hive 3.1.1 and build it locally to see if it fixes the issue.

@mazar
Copy link

mazar commented Nov 26, 2018

created pull request #258

@massoudm
Copy link
Author

So the suggested PR above uses following format to use StrictJsonWriter:

messages
    .write
    .format(HiveWarehouseSession.DATAFRAME_TO_STREAM)
    .option("writer", "json")
    .option("table", "test")
    .save()

@GrzesiuKo
Copy link

I have the same problem, but in Structured Streaming and I am using HiveWarehouseSession.STREAM_TO_STREAM. As @massoudm wrote, I am also trying to write to stuct type column and all fields are null.

@GrzesiuKo
Copy link

@massoudm changes worked for me in Structured Streaming

What i did:

  1. Cloned @massoudm branch
  2. In the project root directory I ran sbt assembly
  3. I used the new created hwc jar
  4. My code:
data
      .writeStream
      .queryName(config("stream.name") + "_query")
      .options(hiveConfig)
      .option("writer", "json")
      .format(HiveWarehouseSession.STREAM_TO_STREAM)
      .outputMode("append")
      .start()
  1. Most important are:
.option("writer", "json")
.format(HiveWarehouseSession.STREAM_TO_STREAM)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants