Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom sink provider for structured streaming #238

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

sutugin
Copy link

@sutugin sutugin commented Apr 4, 2018

What changes were proposed in this pull request?

Custom sink provider for using shc in structured streaming job.
#205
For all HBase-related options must be set prefixed "hbase."

How was this patch tested?

inputDF.
   writeStream.
   queryName("hbase writer").
   format("hbase").
   option("checkpointLocation", checkPointProdPath).
   options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase.catalog->catalog)).
  outputMode(OutputMode.Update()).
  trigger(Trigger.ProcessingTime(30.seconds)).
   start

Run structured streaming job and write to HBase)))

@weiqingy
Copy link
Contributor

weiqingy commented Apr 5, 2018

Thanks for this PR, @sutugin

cc: @dongjoon-hyun

Copy link
Contributor

@merlintang merlintang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sbarnoud
Copy link

You forget that SHC suppoorts Avro schema . The user should be able to pass any key in options to define them.

@sbarnoud
Copy link

sbarnoud commented Apr 12, 2018

I propose for options to use:

class HBaseStreamSink(sqlContext: SQLContext,options: Map[String, String]) extends Sink  {

  val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase"
  val specifiedHBaseParams =
    options
      .keySet
      .filter(_.toLowerCase(Locale.ROOT).startsWith("hbase."))
      .map { k => k.drop(6).toString -> options(k) }
      .toMap

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {

    /** As per SPARK-16020 arbitrary transformations are not supported, but
      * converting to an RDD allows us to do magic.
      */

    val df = sqlContext.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(specifiedHBaseParams)
      .format(defaultFormat)
      .save()
  }
}

@sbarnoud
Copy link

sbarnoud commented Apr 12, 2018

Then,
-) the sink doesn't updates Spark counters ... it should be done somewhere
-) the sink short name is not registered
And finally avro type is not working because the avro serializer swap's fields. It looks similar to https://issues.apache.org/jira/browse/SPARK-21402
I'm using the following avro schema:

{"namespace": "example.avro", "type": "record", "name": "DocumentPojo","fields": [
	{"name": "id", "type": "string"},
	{"name": "key",  "type": "string"},
	{"name": "last", "type": "boolean"},
	{"name": "columns", "type": {"type": "map", "values": "string"}}]
}

And i get the following exception:

java.lang.ClassCastException: java.lang.Boolean cannot be cast to scala.collection.immutable.Map
        at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$6.apply(Avro.scala:281)
        at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$7.apply(Avro.scala:304)
        at org.apache.spark.sql.execution.datasources.hbase.types.Avro.toBytes(Avro.scala:56)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:221)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:217)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

@sbarnoud
Copy link

I just declare the Avro schema in the alphabetical order of field names: and it works.

No idea where is the problem, but it has to be corrected.

{"namespace": "example.avro", "type": "record", "name": "DocumentPojo","fields": [
	{"name": "columns", "type": {"type": "map", "values": "string"}},
	{"name": "id", "type": "string"},
	{"name": "key",  "type": "string"},
	{"name": "last", "type": "boolean"}]
}

Registred short name "hbase" of sink provider.
For all option related with HBase checking prefix "hbase." (like catalog, newtable and etc.)
@sutugin
Copy link
Author

sutugin commented Apr 12, 2018

@sbarnoud, Unfortunately I have never used avro and can't comment on it, the only thing I can assume is that in the incoming streaming dataset it is necessary to order the columns exactly as specified in the avro schema
"the sun doesn't updates Spark counters ... it should be done somewhere " - what specific counters do you mean?

@sbarnoud
Copy link

sbarnoud commented Apr 12, 2018

Hi,

For counters:

            @Override
            public void onQueryProgress(QueryProgressEvent event) {
                log.info("QueryProgressEvent event :"+event.progress().numInputRows());
            }

QueryProgressEvent contains some counters like numInputRows ... that are not updated.

For Avro, I found the bug, but didn't succeed to test (my patch is not loaded first ...).
The bug is because org.apache.spark.sql.execution.datasources.hbase.types.Avro doesn't use the same schema to serialize and to deserialize.
Here is the code:

class Avro(f:Option[Field] = None) extends SHCDataType {

  def fromBytes(src: HBaseType): Any = {
    if (f.isDefined) {
      val m = AvroSerde.deserialize(src, f.get.exeSchema.get)
      val n = f.get.avroToCatalyst.map(_ (m))
      n.get
    } else {
      throw new UnsupportedOperationException(
        "Avro coder: without field metadata, 'fromBytes' conversion can not be supported")
    }
  }

  def toBytes(input: Any): Array[Byte] = {
    // Here we assume the top level type is structType
    if (f.isDefined) {
      val record = f.get.catalystToAvro(input)
      AvroSerde.serialize(record, f.get.schema.get)
    } else {
      throw new UnsupportedOperationException(
        "Avro coder: Without field metadata, 'toBytes' conversion can not be supported")
    }
  }
}

As you can see, the avro schema used to serialize is f.get.schema.get which is the dataset schema instead of f.get.exeSchema.get which is the user supplied one.
That's why if the field names are not in the same order in BOTH serialization fails.

@sutugin
Copy link
Author

sutugin commented Apr 12, 2018

@sbarnoud,

@sbarnoud
Copy link

sbarnoud commented Apr 13, 2018

Hi

Sorry but metrics reporting doesn't work for me. I already configure my job to have them, and if i change my sink to parquet on the same stream, i get them.
Did you try it ? Have you the same behavior ?

I didn't understand why you use pattern matching (and not startsWith) here:

private val hbaseSettings = parameters.filterKeys(
    _.toLowerCase matches hbaseOptionPrefix + "*") map {
    case (k, v) => (k.replace(hbaseOptionPrefix, ""), v)
}

If you really want to, please use the right pattern: hbase\..*
\. => to match a dot
.* => to match anythings

scala>  "hbase.x" matches "hbase.*"
res5: Boolean = true

scala>  "hbasex" matches "hbase.*"
res6: Boolean = true

scala> "hbase.x" matches "hbase\\..*"
res7: Boolean = true

scala> "hbase.x" matches "hbase\\."
res8: Boolean = false

scala>

I will open an issue for Avro.

@sbarnoud
Copy link

@sbarnoud
Copy link

This code works, and avoids the current comment "allows us to do magic" ;-):

class HBaseStreamSink(sqlContext: SQLContext,options: Map[String, String]) extends Sink with Logging {

  val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase"

  val specifiedHBaseParams =
    options
      .keySet
      .filter(_.toLowerCase(Locale.ROOT).startsWith("hbase."))
      .map { k => k.drop(6).toString -> options(k) }
      .toMap

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    // use a local variable to make sure the map closure doesn't capture the whole DataFrame
    val schema = data.schema
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
        val converter = CatalystTypeConverters.createToScalaConverter(schema)
        rows.map(converter(_).asInstanceOf[Row])
      }

    val df = sqlContext.sparkSession.createDataFrame(res,schema)
    df.write
      .options(specifiedHBaseParams)
      .format(defaultFormat)
      .save()
  }
}

18/04/13 14:26:11 INFO StreamExecution: Streaming query made progress: {
"id" : "3197f58d-9dc6-4d02-b081-e8fed6d5b57f",
"runId" : "1d313972-6150-48cf-b448-c32411f4d421",
"name" : "KafkaToHBaseStream",
"timestamp" : "2018-04-13T12:26:02.788Z",
"numInputRows" : 2,

@sutugin
Copy link
Author

sutugin commented Apr 13, 2018

@sbarnoud, Hi! Thank you, great work!

@sbarnoud
Copy link

Hi,

I have on my own version added the support of "short names"
-) shc
-) shcstream

Could you validate those short names ?

core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

org.apache.spark.sql.execution.datasources.hbase.DefaultSource
org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider

org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala

private[sql] class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister {

  override def shortName(): String = "shc" // TODO Validate this name

org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala

class HBaseStreamSinkProvider
  extends DataSourceRegister
    with StreamSinkProvider {
  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {
    new HBaseStreamSink(sqlContext,parameters)
  }

  override def  shortName(): String = "shcstream" // TODO Validate this name
}

@sutugin
Copy link
Author

sutugin commented Dec 15, 2018

@sbarnoud, hi!
I don't mind, or you suggest changing names in PR?

@sbarnoud
Copy link

No, i let you decide the names but i sent you the correct code. Both batch and streaming short names must be defined and different and both classes must be in the ressources.
As far as i see it isn’t currently the case.

@sutugin
Copy link
Author

sutugin commented Dec 17, 2018

I think with the advent of spark 2.4, this is no longer relevant, foreachBatch will solve all the problems of custom sinks (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants