Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Dec 18, 2023
1 parent a7ae16c commit 0815591
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public RowDataSerializer build() {
Preconditions.checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type) || ARROW.equals(type));
Preconditions.checkNotNull(dataTypes);
Preconditions.checkNotNull(fieldNames);
Preconditions.checkArgument(ARROW.equals(type) && !deletable);
return new RowDataSerializer(fieldNames, dataTypes, type, fieldDelimiter, deletable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
package org.apache.doris.flink.sink.writer;

import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.types.DataType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;

import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
Expand Down Expand Up @@ -108,4 +115,32 @@ public void testParseDeleteSign() {
Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.DELETE));
Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.UPDATE_BEFORE));
}
@Test
public void testArrowType() throws Exception {
RowDataSerializer serializer = RowDataSerializer.builder()
.setFieldNames(fieldNames)
.setFieldType(dataTypes)
.setType("arrow")
.enableDelete(false)
.build();

// write data to binary
serializer.initial();
serializer.serialize(rowData);
byte[] serializedValue = serializer.flush().getRow();

// read data from binary
LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes);
RowType rowType = RowType.of(logicalTypes, fieldNames);
ArrowSerializer arrowSerializer = new ArrowSerializer(rowType, rowType);
ByteArrayInputStream input = new ByteArrayInputStream(serializedValue);
arrowSerializer.open(input, new ByteArrayOutputStream(0));
int cnt = arrowSerializer.load();
RowData data = arrowSerializer.read(0);

Assert.assertEquals(1, cnt);
Assert.assertEquals(3, data.getInt(0));
Assert.assertEquals("test", data.getString(1).toString());
Assert.assertEquals(60.2, data.getDouble(2), 0.001);
}
}

0 comments on commit 0815591

Please sign in to comment.