Skip to content

Commit

Permalink
[BUG] Fixed avro format support for storing null (#8424)
Browse files Browse the repository at this point in the history
Co-authored-by: Tu-maimes <[email protected]>
  • Loading branch information
Tu-maimes and Tu-maimes authored Jan 3, 2025
1 parent 3ac977c commit 979a670
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class AvroToRowConverter implements Serializable {

Expand Down Expand Up @@ -82,6 +83,9 @@ public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType rowType) {
}

private Object convertField(SeaTunnelDataType<?> dataType, Object val) {
if (Objects.isNull(val)) {
return null;
}
switch (dataType.getSqlType()) {
case STRING:
return val.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,39 @@ private static Schema seaTunnelDataType2AvroDataType(

switch (seaTunnelDataType.getSqlType()) {
case STRING:
return Schema.create(Schema.Type.STRING);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING));
case BYTES:
return Schema.create(Schema.Type.BYTES);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
case TINYINT:
case SMALLINT:
case INT:
return Schema.create(Schema.Type.INT);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT));
case BIGINT:
return Schema.create(Schema.Type.LONG);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG));
case FLOAT:
return Schema.create(Schema.Type.FLOAT);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.FLOAT));
case DOUBLE:
return Schema.create(Schema.Type.DOUBLE);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE));
case BOOLEAN:
return Schema.create(Schema.Type.BOOLEAN);
return Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN));
case MAP:
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) seaTunnelDataType).getValueType();
return Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType));
return Schema.createUnion(
Schema.create(Schema.Type.NULL),
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType)));
case ARRAY:
SeaTunnelDataType<?> elementType =
((ArrayType<?, ?>) seaTunnelDataType).getElementType();
return Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType));
return Schema.createUnion(
Schema.create(Schema.Type.NULL),
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType)));
case ROW:
SeaTunnelDataType<?>[] fieldTypes =
((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
Expand All @@ -93,12 +104,18 @@ private static Schema seaTunnelDataType2AvroDataType(
int precision = ((DecimalType) seaTunnelDataType).getPrecision();
int scale = ((DecimalType) seaTunnelDataType).getScale();
LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale);
return decimal.addToSchema(Schema.create(Schema.Type.BYTES));
return Schema.createUnion(
Schema.create(Schema.Type.NULL),
decimal.addToSchema(Schema.create(Schema.Type.BYTES)));
case TIMESTAMP:
return LogicalTypes.localTimestampMillis()
.addToSchema(Schema.create(Schema.Type.LONG));
return Schema.createUnion(
Schema.create(Schema.Type.NULL),
LogicalTypes.localTimestampMillis()
.addToSchema(Schema.create(Schema.Type.LONG)));
case DATE:
return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
return Schema.createUnion(
Schema.create(Schema.Type.NULL),
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)));
case NULL:
return Schema.create(Schema.Type.NULL);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,59 @@ public void testSerialization() throws IOException {
LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13);
Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0);
}

private SeaTunnelRow buildSeaTunnelRowValueNull() {
SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
subSeaTunnelRow.setField(0, null);
subSeaTunnelRow.setField(1, null);
subSeaTunnelRow.setField(2, null);
subSeaTunnelRow.setField(3, null);
subSeaTunnelRow.setField(4, null);
subSeaTunnelRow.setField(5, null);
subSeaTunnelRow.setField(6, null);
subSeaTunnelRow.setField(7, null);
subSeaTunnelRow.setField(8, null);
subSeaTunnelRow.setField(9, null);
subSeaTunnelRow.setField(10, null);
subSeaTunnelRow.setField(11, null);
subSeaTunnelRow.setField(12, null);
subSeaTunnelRow.setField(13, null);

SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
seaTunnelRow.setField(0, null);
seaTunnelRow.setField(1, null);
seaTunnelRow.setField(2, null);
seaTunnelRow.setField(3, null);
seaTunnelRow.setField(4, null);
seaTunnelRow.setField(5, null);
seaTunnelRow.setField(6, null);
seaTunnelRow.setField(7, null);
seaTunnelRow.setField(8, null);
seaTunnelRow.setField(9, null);
seaTunnelRow.setField(10, null);
seaTunnelRow.setField(11, null);
seaTunnelRow.setField(12, null);
seaTunnelRow.setField(13, null);
seaTunnelRow.setField(14, subSeaTunnelRow);
return seaTunnelRow;
}

@Test
public void testSerializationValueNull() throws IOException {
SeaTunnelRowType rowType = buildSeaTunnelRowType();
CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "", "", "test", rowType);
SeaTunnelRow seaTunnelRow = buildSeaTunnelRowValueNull();
AvroSerializationSchema serializationSchema = new AvroSerializationSchema(rowType);
byte[] bytes = serializationSchema.serialize(seaTunnelRow);
AvroDeserializationSchema deserializationSchema =
new AvroDeserializationSchema(catalogTable);
SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes);
String[] strArray1 = (String[]) seaTunnelRow.getField(1);
String[] strArray2 = (String[]) deserialize.getField(1);
Assertions.assertArrayEquals(strArray1, strArray2);
SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14);
Assertions.assertEquals(subRow.getField(9), null);
Assertions.assertEquals(subRow.getField(12), null);
Assertions.assertEquals(subRow.getField(13), null);
}
}

0 comments on commit 979a670

Please sign in to comment.