From 979a6703a00d96c4cd3cddb53b57008f6fb6f128 Mon Sep 17 00:00:00 2001 From: Tu-maimes Date: Fri, 3 Jan 2025 09:04:01 +0800 Subject: [PATCH] [BUG] Fixed avro format support for storing null (#8424) Co-authored-by: Tu-maimes --- .../format/avro/AvroToRowConverter.java | 4 ++ ...SeaTunnelRowTypeToAvroSchemaConverter.java | 43 ++++++++++----- .../avro/AvroSerializationSchemaTest.java | 55 +++++++++++++++++++ 3 files changed, 89 insertions(+), 13 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java index e80b78ee83f..84b10636001 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class AvroToRowConverter implements Serializable { @@ -82,6 +83,9 @@ public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType rowType) { } private Object convertField(SeaTunnelDataType dataType, Object val) { + if (Objects.isNull(val)) { + return null; + } switch (dataType.getSqlType()) { case STRING: return val.toString(); diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java index 0a990f3fc6f..56a65d3e53d 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java @@ -58,28 +58,39 @@ private static Schema seaTunnelDataType2AvroDataType( switch (seaTunnelDataType.getSqlType()) { case STRING: - return Schema.create(Schema.Type.STRING); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)); case BYTES: - return Schema.create(Schema.Type.BYTES); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES)); case TINYINT: case SMALLINT: case INT: - return Schema.create(Schema.Type.INT); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT)); case BIGINT: - return Schema.create(Schema.Type.LONG); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)); case FLOAT: - return Schema.create(Schema.Type.FLOAT); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.FLOAT)); case DOUBLE: - return Schema.create(Schema.Type.DOUBLE); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE)); case BOOLEAN: - return Schema.create(Schema.Type.BOOLEAN); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN)); case MAP: SeaTunnelDataType valueType = ((MapType) seaTunnelDataType).getValueType(); - return Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType))); case ARRAY: SeaTunnelDataType elementType = ((ArrayType) seaTunnelDataType).getElementType(); - return Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType))); case ROW: SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes(); @@ -93,12 +104,18 @@ private static Schema seaTunnelDataType2AvroDataType( int precision = ((DecimalType) seaTunnelDataType).getPrecision(); int scale = ((DecimalType) seaTunnelDataType).getScale(); LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale); - return decimal.addToSchema(Schema.create(Schema.Type.BYTES)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + decimal.addToSchema(Schema.create(Schema.Type.BYTES))); case TIMESTAMP: - return LogicalTypes.localTimestampMillis() - .addToSchema(Schema.create(Schema.Type.LONG)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.localTimestampMillis() + .addToSchema(Schema.create(Schema.Type.LONG))); case DATE: - return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))); case NULL: return Schema.create(Schema.Type.NULL); default: diff --git a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java index 42b8029f16c..52ba7d76e68 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java @@ -177,4 +177,59 @@ public void testSerialization() throws IOException { LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13); Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0); } + + private SeaTunnelRow buildSeaTunnelRowValueNull() { + SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14); + subSeaTunnelRow.setField(0, null); + subSeaTunnelRow.setField(1, null); + subSeaTunnelRow.setField(2, null); + subSeaTunnelRow.setField(3, null); + subSeaTunnelRow.setField(4, null); + subSeaTunnelRow.setField(5, null); + subSeaTunnelRow.setField(6, null); + subSeaTunnelRow.setField(7, null); + subSeaTunnelRow.setField(8, null); + subSeaTunnelRow.setField(9, null); + subSeaTunnelRow.setField(10, null); + subSeaTunnelRow.setField(11, null); + subSeaTunnelRow.setField(12, null); + subSeaTunnelRow.setField(13, null); + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15); + seaTunnelRow.setField(0, null); + seaTunnelRow.setField(1, null); + seaTunnelRow.setField(2, null); + seaTunnelRow.setField(3, null); + seaTunnelRow.setField(4, null); + seaTunnelRow.setField(5, null); + seaTunnelRow.setField(6, null); + seaTunnelRow.setField(7, null); + seaTunnelRow.setField(8, null); + seaTunnelRow.setField(9, null); + seaTunnelRow.setField(10, null); + seaTunnelRow.setField(11, null); + seaTunnelRow.setField(12, null); + seaTunnelRow.setField(13, null); + seaTunnelRow.setField(14, subSeaTunnelRow); + return seaTunnelRow; + } + + @Test + public void testSerializationValueNull() throws IOException { + SeaTunnelRowType rowType = buildSeaTunnelRowType(); + CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "", "", "test", rowType); + SeaTunnelRow seaTunnelRow = buildSeaTunnelRowValueNull(); + AvroSerializationSchema serializationSchema = new AvroSerializationSchema(rowType); + byte[] bytes = serializationSchema.serialize(seaTunnelRow); + AvroDeserializationSchema deserializationSchema = + new AvroDeserializationSchema(catalogTable); + SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes); + String[] strArray1 = (String[]) seaTunnelRow.getField(1); + String[] strArray2 = (String[]) deserialize.getField(1); + Assertions.assertArrayEquals(strArray1, strArray2); + SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14); + Assertions.assertEquals(subRow.getField(9), null); + Assertions.assertEquals(subRow.getField(12), null); + Assertions.assertEquals(subRow.getField(13), null); + } }