diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala index 6a2b15bf..ead172cb 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala @@ -95,7 +95,7 @@ case class Field( // converter from catalyst to avro lazy val catalystToAvro: (Any) => Any ={ - SchemaConverters.createConverterToAvro(dt, colName, "recordNamespace") + SchemaConverters.createConverterToAvro(dt, exeSchema.get,colName, "recordNamespace") } val dt = diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/types/Avro.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/types/Avro.scala index 5b7f2dec..53c7a32a 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/types/Avro.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/types/Avro.scala @@ -56,7 +56,7 @@ class Avro(f:Option[Field] = None) extends SHCDataType { // Here we assume the top level type is structType if (f.isDefined) { val record = f.get.catalystToAvro(input) - AvroSerde.serialize(record, f.get.schema.get) + AvroSerde.serialize(record, f.get.exeSchema.get) } else { throw new UnsupportedOperationException( "Avro coder: Without field metadata, 'toBytes' conversion can not be supported") @@ -243,7 +243,8 @@ object SchemaConverters { // writing Avro records out to disk. def createConverterToAvro( dataType: DataType, - structName: String, + avroType: Schema, + currentFieldName: String, recordNamespace: String): (Any) => Any = { dataType match { @@ -257,7 +258,11 @@ object SchemaConverters { case TimestampType => (item: Any) => if (item == null) null else item.asInstanceOf[Timestamp].getTime case ArrayType(elementType, _) => - val elementConverter = createConverterToAvro(elementType, structName, recordNamespace) + val elementConverter = createConverterToAvro( + elementType, + avroType.getElementType, + avroType.getElementType.getName, + recordNamespace) (item: Any) => { if (item == null) { null @@ -274,7 +279,11 @@ object SchemaConverters { } } case MapType(StringType, valueType, _) => - val valueConverter = createConverterToAvro(valueType, structName, recordNamespace) + val valueConverter = createConverterToAvro( + valueType, + avroType.getValueType, + avroType.getValueType.getName, + recordNamespace) (item: Any) => { if (item == null) { null @@ -287,23 +296,44 @@ object SchemaConverters { } } case structType: StructType => - val builder = SchemaBuilder.record(structName).namespace(recordNamespace) - val schema: Schema = SchemaConverters.convertSparkStructTypeToAvro( - structType, builder, recordNamespace) + // Avro schema is the user supplied one, not the one generated from the dataset + val schema: Schema = avroType + // Build in the structType order, which has been build for: + // schema.map{ x => SchemaConverters.toSqlType(x).dataType }.get + // where shema is the Avro schema => it is not the Dataframe field order! val fieldConverters = structType.fields.map(field => - createConverterToAvro(field.dataType, field.name, recordNamespace)) + createConverterToAvro( + field.dataType, + schema.getField(field.name).schema(), + field.name, + recordNamespace)) (item: Any) => { if (item == null) { null } else { val record = new Record(schema) + val row = item.asInstanceOf[Row] + val rowIterator = row.toSeq.iterator + val fieldNamesIterator = structType.fieldNames.iterator val convertersIterator = fieldConverters.iterator - val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator - val rowIterator = item.asInstanceOf[Row].toSeq.iterator - while (convertersIterator.hasNext) { - val converter = convertersIterator.next() - record.put(fieldNamesIterator.next(), converter(rowIterator.next())) + if (row.schema == null) { + // It seems we can be here with a row without schema ... + + // No schema: fields in the Row have to be in the expected order + // (defined by the avro schema) + while (convertersIterator.hasNext) { + val converter = convertersIterator.next() + record.put(fieldNamesIterator.next(), converter(rowIterator.next())) + } + } else { + // The row may come for a Dataframe with a different field order + // Take them by name and not by position + while (fieldNamesIterator.hasNext) { + val fieldname = fieldNamesIterator.next() + val converter = convertersIterator.next() + record.put(fieldname, converter(row.get(row.fieldIndex(fieldname)))) + } } record } diff --git a/core/src/test/scala/org/apache/spark/sql/AvroRecordSuite.scala b/core/src/test/scala/org/apache/spark/sql/AvroRecordSuite.scala index 4528d40b..ea12f054 100644 --- a/core/src/test/scala/org/apache/spark/sql/AvroRecordSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/AvroRecordSuite.scala @@ -54,7 +54,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft println(sqlUser1) val schema = SchemaConverters.toSqlType(avroSchema) println(s"\nSqlschema: $schema") - val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, "avro", "example.avro")(sqlUser1) + val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, avroSchema,"avro", "example.avro")(sqlUser1) val avroByte = AvroSerde.serialize(avroUser1, avroSchema) val avroUser11 = AvroSerde.deserialize(avroByte, avroSchema) println(s"$avroUser1") @@ -77,7 +77,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft println(sqlConv) val sqlSchema = SchemaConverters.toSqlType(avroSchema) println(s"\nSqlschema: $sqlSchema") - val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, "avro", "example.avro")(sqlConv) + val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, avroSchema,"avro", "example.avro")(sqlConv) val avroBytes = AvroSerde.serialize(avroData, avroSchema) val desData = AvroSerde.deserialize(avroBytes, avroSchema) println(s"$desData") @@ -107,7 +107,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft println(sqlConv) val sqlSchema = SchemaConverters.toSqlType(avroSchema) println(s"\nSqlschema: $sqlSchema") - val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, "avro", "example.avro")(sqlConv) + val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, avroSchema,"avro", "example.avro")(sqlConv) val avroBytes = AvroSerde.serialize(avroData, avroSchema) val desData = AvroSerde.deserialize(avroBytes, avroSchema) println(s"$desData") @@ -238,7 +238,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft val sqlRec = SchemaConverters.createConverterToSQL(avroComplex)(avroRec) println(s"\nsqlRec: $sqlRec") - val avroRec1 = SchemaConverters.createConverterToAvro(schema.dataType, "test_schema", "example.avro")(sqlRec) + val avroRec1 = SchemaConverters.createConverterToAvro(schema.dataType, avroComplex,"test_schema", "example.avro")(sqlRec) println(s"\navroRec1: $avroRec1") val avroByte = AvroSerde.serialize(avroRec1, avroComplex) println("\nserialize") @@ -247,4 +247,42 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft val sqlRec1 = SchemaConverters.createConverterToSQL(avroComplex)(avroRec11) println(s"sqlRec1: $sqlRec1") } + +test("avro not dependent on schema field order") { + val schemaString = + s"""{"namespace": "example.avro", + | "type": "record", "name": "User", + | "fields": [ {"name": "name", "type": "string"}, + | {"name": "bool", "type": "boolean"} ] }""".stripMargin + val avroSchema: Schema = { + val p = new Schema.Parser + p.parse(schemaString) + } + val schemaDatasetString = + s"""{"namespace": "example.avro", + | "type": "record", "name": "User", + | "fields": [ {"name": "bool", "type": "boolean"}, + | {"name": "name", "type": "string"} ] }""".stripMargin + val avroDatasetSchema: Schema = { + val p = new Schema.Parser + p.parse(schemaDatasetString) + } + + val user1 = new GenericData.Record(avroDatasetSchema) + user1.put("name", "Alyssa") + user1.put("bool", true) + + val user2 = new GenericData.Record(avroDatasetSchema) + user2.put("name", "Ben") + user2.put("bool", false) + + val sqlUser1 = SchemaConverters.createConverterToSQL(avroDatasetSchema)(user1) + println(s"user1 from sql: $sqlUser1") + val schema = SchemaConverters.toSqlType(avroDatasetSchema) + println(s"\nSqlschema: $schema") + val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, avroSchema,"avro", "example.avro")(sqlUser1) + val avroByte = AvroSerde.serialize(avroUser1, avroSchema) + val avroUser11 = AvroSerde.deserialize(avroByte, avroSchema) + println(s"user1 deserialized: $avroUser1") + } } diff --git a/examples/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/AvroRecord.scala b/examples/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/AvroRecord.scala index d90f554a..ea4cc5d7 100644 --- a/examples/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/AvroRecord.scala +++ b/examples/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/AvroRecord.scala @@ -47,7 +47,7 @@ object AvroRecord { println(sqlUser1) val schema = SchemaConverters.toSqlType(avroSchema) println(s"\nSqlschema: $schema") - val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, "avro", "example.avro")(sqlUser1) + val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, avroSchema, "avro", "example.avro")(sqlUser1) val avroByte = AvroSerde.serialize(avroUser1, avroSchema) val avroUser11 = AvroSerde.deserialize(avroByte, avroSchema) println(s"$avroUser1")