Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Need help] Join HBase dataframe with a Structured Stream #235

Open
ser0t0nin opened this issue Mar 30, 2018 · 2 comments
Open

[Need help] Join HBase dataframe with a Structured Stream #235

ser0t0nin opened this issue Mar 30, 2018 · 2 comments

Comments

@ser0t0nin
Copy link

I have a similar question on StackOverflow.

I have a kafka stream with some updates of objects, stored in HBase. Those updates have a version and a timestamp of the change. I need to understand, if my HBase object newer or older than that one, i got from Kafka. The best way to do that is to join the stream with static HBase dataframe on objectId and filter out some rows with a version lower, than already stored. The rest is to be updated (or upserted). The alternate way is to set the HBase version timestamp of each row manually using the update timestamp. So my code (python) is like that:

spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'kafka01:9092,kafka02:9092,kafka03:9092') \
    .option('subscribe', 'youdo') \
    .option('group.id', 'spark') \
    .option('startingOffsets', 'earliest') \
    .load() \
    .withColumn(
        'decoded', 
        from_json(
            col('value').cast('string'),
            schema
        )
    ) \
    .select(
        'decoded.body.*',
        'timestamp'
    ) \
    .join(
        sqlc.read \
            .format('org.apache.spark.sql.execution.datasources.hbase') \
            .options(catalog=catalog_hbase) \
            .load() \
            .select('id', col('hbase_version').cast('integer')),
        ['id'],
        'left'
    ) \
    .na.fill({'hbase_version': 0}) \
    .filter(col('version').cast('integer') > col('hbase_version')) \
    .na.fill('null') \
    .writeStream \
    .outputMode("append") \
    .format('HBase.HBaseSinkProvider') \
    .option('hbasecat', catalog_kafka) \
    .option('checkpointLocation', '/tmp/checkpoint') \
    .start() \
    .awaitTermination()

But i always get such an error: streamingQueryException: key not found: hbase_version
It seems like this type of join is not supported, or maybe i miss something?

The resulting solution for me must by in python, but it is ok to pass some additional jars with spark-submit.

@sbarnoud
Copy link

sbarnoud commented May 9, 2018

Hi,

In my opinion you should replace .select('id', col('hbase_version').cast('integer')), by .selectExpr('id','cast(hbase_version as integer) as hbase_version') or use .alias().

Did you try with the latest HBaseSync from #238?
I'm wondering .option('hbasecat', catalog_kafka), the option name to pass the HBase schema is now hbase.catalog. 'hbasecat' doesn't sound anything to me.

@omkarahane
Copy link

@ser0t0nin I'm also trying to connect to hbase via spark structured streaming but failing to do so, for Writing the data I'm using Spark hortonworks connector, with following code.

kafkaIpStream.selectExpr("cast (key as String)", "cast (value as String)")
      .withColumn("ts", split($"key", "/")(1))
      .selectExpr("key as rowkey", "ts", "value as val")
      .writeStream
      .option("failOnDataLoss", false)
      .outputMode(OutputMode.Update())
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF
          .write
          .options(Map(HBaseTableCatalog.tableCatalog -> HbaseTableConf.getRawtableConf(HbaseConstant.hbaseRawTable), HBaseTableCatalog.newTable -> "5"))
          .format("org.apache.spark.sql.execution.datasources.hbase").save
      }.start()

But I always end up getting an exception:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.NamespaceNotFoundException

Let me know if you can help, you can find the details of the issue at 'https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming'

Appreciate the help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants