Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBaseTableCatalog not found while spark2-submit #314

Open
ttewary92 opened this issue May 9, 2019 · 0 comments
Open

HBaseTableCatalog not found while spark2-submit #314

ttewary92 opened this issue May 9, 2019 · 0 comments

Comments

@ttewary92
Copy link

Hello,

I am currently facing certain challenges, when writing to HBase from Spark using shc jar.

Spark 2.1.0
Hbase on cluster 1.2.0

Spark submit statement:
spark2-submit --master yarn --deploy-mode client --files /etc/hbase/conf/hbase-site.xml --class com.bde.datalake.test.Prepickup /home/datalake/tt/bde_datalake_2.11-1.0.jar --jars /home/datalake/tt/shc-core-1.1.0-2.1-s_2.11.jar --conf "spark.driver.extraClassPath=/home/datalake/tt/shc-core-1.1.0-2.1-s_2.11.jar" --packages com.hortonworks:shc:1.1.0-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/repositories/releases/

Following is my code:
package com.bde.datalake.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
import org.apache.spark.storage._
import org.apache.hadoop.hbase.{ TableName, HBaseConfiguration }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.{ Result, Put }

object Prepickup {

val spark = SparkSession
.builder()
.appName("CT_Spark_parellel_test")
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

spark.sql("set spark.dynamicAllocation.enabled = true")
spark.sql("set spark.shuffle.service.enabled = true")
spark.sql("set spark.dynamicAllocation.minExecutors = 10")
spark.sql("set spark.dynamicAllocation.maxExecutors = 25")
spark.sql("set spark.executor.cores = 5")
spark.sql("set spark.driver.memory = 15g")
spark.sql("set spark.executer.memory = 30g")
spark.sql("set spark.driver.extraClassPath = /fae/conf/hadoop/hbase-site.xml")

def main(args : Array[String]) = {

val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "172.18.114.97")
config.set("hbase.zookeeper.property.clientPort", "2181")
config.set(TableInputFormat.INPUT_TABLE, "WFNSP.PBL_OPS_PREPICKUP")

/*config.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
config.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
* hbaseConfig.set(TableInputFormat.SCAN_COLUMNS*/

var columnFamily = "CAT"

val prepickupRDD = spark.sparkContext.newAPIHadoopRDD(config, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

val prepickupDF = prepickupRDD.map(x => (Bytes.toString(x._2.getRow()), 
    Bytes.toString(x._2.getValue(Bytes.toBytes("CAT"), Bytes.toBytes("COL1"))),
    Bytes.toString(x._2.getValue(Bytes.toBytes("CAT"), Bytes.toBytes("DRLDDT")))  ))
    .toDF()
    .withColumnRenamed("_1", "PK")
    .withColumnRenamed("_2", "COL1")
    .withColumnRenamed("_3", "DRLDDT")
    
prepickupDF.createOrReplaceTempView("prepickupDF")
    
val prepickupDF1 = spark.sql(s"""select PK, SPLIT(COL1,'~')[0] AS AWB, SPLIT(COL1,'~')[1] AS STTY, SPLIT(COL1,'~')[2] AS STCD, SPLIT(COL1,'~')[3] AS STD, SPLIT(COL1,'~')[4] AS STGR, SPLIT(COL1,'~')[5] AS PURT, SPLIT(COL1,'~')[6] AS PRCD, SPLIT(COL1,'~')[7] AS PRTY, SPLIT(COL1,'~')[8] AS SPRD, SPLIT(COL1,'~')[9] AS PKTY, SPLIT(COL1,'~')[10] AS CPRCD, SPLIT(COL1,'~')[11] AS PUDT, SPLIT(COL1,'~')[12] AS PUTM, SPLIT(COL1,'~')[13] AS STDT, SPLIT(COL1,'~')[14] AS STTM, SPLIT(COL1,'~')[15] AS CMARE from prepickupDF""")

printf(prepickupDF.count().toString())

def catalog1 =
  s"""{
    |"table":{"namespace":"default", "name":"PREPICKUP1"},
    |"rowkey":"PK",
    |"columns":{
      |"PK":{"cf":"rowkey",	"col":"PK", "type":"string"},
    |"AWB":{"cf":"CAT",	"col":"AWB", "type":"string"},
    |"STTY":{"cf":"CAT",	"col":"STTY", "type":"string"},
    |"STCD":{"cf":"CAT",	"col":"STCD", "type":"string"},
      |"STD":{"cf":"CAT",	"col":"STD", "type":"string"},
    |"STGR":{"cf":"CAT",	"col":"STGR", "type":"string"},
    |"PURT":{"cf":"CAT",	"col":"PURT", "type":"string"},
      |"PRCD":{"cf":"CAT",	"col":"PRCD", "type":"string"},
    |"PRTY":{"cf":"CAT",	"col":"PRTY", "type":"string"},
    |"SPRD":{"cf":"CAT",	"col":"SPRD", "type":"string"},
      |"PKTY":{"cf":"CAT",	"col":"PKTY", "type":"string"},
    |"CPRCD":{"cf":"CAT",	"col":"CPRCD", "type":"string"},
    |"PUDT":{"cf":"CAT",	"col":"PUDT", "type":"string"},
      |"PUTM":{"cf":"CAT",	"col":"PUTM", "type":"string"},
    |"STDT":{"cf":"CAT",	"col":"STDT", "type":"string"},
    |"STTM":{"cf":"CAT",	"col":"STTM", "type":"string"},
      |"CMARE":{"cf":"CAT",	"col":"CMARE", "type":"string"}
    |}
    |}""".stripMargin

prepickupDF1.write
.options(Map(HBaseTableCatalog.tableCatalog -> catalog1, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
}

build.sbt

name := "bde_datalake"

version := "1.0"

scalaVersion in ThisBuild := "2.11.8"

val sparkVersion = "2.1.0"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.1"

dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.6.7.1"

unmanagedJars in Compile += file("lib/shc-core-1.1.0-2.1-s_2.11.jar")

resolvers += "SparkPackages" at "https://dl.bintray.com/spark-packages/maven/"
resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))
resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0",

"org.apache.kafka" % "kafka-clients" % "0.8.2.0",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.0",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-hive" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.6.0",

//"org.apache.hbase" % "hbase-spark" % "1.2.0-cdh5.13.1",
"org.apache.hbase" % "hbase-common" % "1.1.2",
"org.apache.hbase" % "hbase-client" % "1.1.2",
"org.apache.hbase" % "hbase-server" % "1.1.2",
"org.scala-lang" % "scala-library" % "2.11.8",
"org.scala-lang" % "scala-reflect" % "2.11.8",
"edu.stanford.nlp" % "stanford-corenlp" % "3.7.0" ,
"edu.stanford.nlp" % "stanford-corenlp" % "3.7.0" % "test" classifier "models",
"databricks" % "spark-corenlp" % "0.2.0-s_2.11"
)

resolvers ++= Seq(
"Typesafe" at "http://repo.hortonworks.com/content/repositories/releases/",
"Java.net Maven2 Repository" at "http://download.java.net/maven/2/"
)

fork in run := true

I get the following error while running the code:
225926Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog$
at com.bde.datalake.test.Prepickup$.main(Prepickup.scala:135)
at com.bde.datalake.test.Prepickup.main(Prepickup.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 12 more

I have added the shc jar, packages and repository to spark submit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant