Skip to content

Commit

Permalink
[hotfix-#1825][hdfs]Synchronizing text and parquet table data, the sp…
Browse files Browse the repository at this point in the history
…eed is too slow when the table field contains date type
  • Loading branch information
OT-XY committed Sep 21, 2023
1 parent da35245 commit 942f68a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import com.dtstack.chunjun.element.column.IntColumn;
import com.dtstack.chunjun.element.column.LongColumn;
import com.dtstack.chunjun.element.column.ShortColumn;
import com.dtstack.chunjun.element.column.SqlDateColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
import com.dtstack.chunjun.util.ColumnTypeUtil;
import com.dtstack.chunjun.util.DateUtil;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -166,7 +166,10 @@ protected IDeserializationConverter createInternalConverter(String type) {
TimestampColumn::new;
case "DATE":
return (IDeserializationConverter<String, AbstractBaseColumn>)
val -> new TimestampColumn(DateUtil.getTimestampFromStr(val));
val ->
val == null
? new SqlDateColumn(null)
: new SqlDateColumn(Date.valueOf(val));
case "BINARY":
return (IDeserializationConverter<byte[], AbstractBaseColumn>) BytesColumn::new;
case "ARRAY":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,10 @@ protected IDeserializationConverter createInternalConverter(String type) {
};
case "DATE":
return (IDeserializationConverter<String, AbstractBaseColumn>)
val -> {
Timestamp timestamp = DateUtil.getTimestampFromStr(val);
if (timestamp == null) {
return new SqlDateColumn(null);
} else {
return new SqlDateColumn(
Date.valueOf(timestamp.toLocalDateTime().toLocalDate()));
}
};
val ->
val == null
? new SqlDateColumn(null)
: new SqlDateColumn(Date.valueOf(val));
case "BINARY":
case "ARRAY":
case "MAP":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.element.AbstractBaseColumn;
import com.dtstack.chunjun.element.column.NullColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.enums.ColumnType;
import com.dtstack.chunjun.throwable.ChunJunException;
Expand Down Expand Up @@ -93,7 +94,7 @@ public AbstractRowConverter(int converterSize, CommonConfig commonConfig) {
protected IDeserializationConverter wrapIntoNullableInternalConverter(
IDeserializationConverter IDeserializationConverter) {
return val -> {
if (val == null) {
if (val == null || val instanceof NullColumn) {
return null;
} else {
try {
Expand Down

0 comments on commit 942f68a

Please sign in to comment.