From 942f68af397795503325e779cecd41a11e2fc5ec Mon Sep 17 00:00:00 2001 From: OT-XY Date: Thu, 21 Sep 2023 16:04:24 +0800 Subject: [PATCH] [hotfix-#1825][hdfs]Synchronizing text and parquet table data, the speed is too slow when the table field contains date type --- .../hdfs/converter/HdfsParquetSyncConverter.java | 7 +++++-- .../hdfs/converter/HdfsTextSyncConverter.java | 13 ++++--------- .../chunjun/converter/AbstractRowConverter.java | 3 ++- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java index 09f4f1d505..b33874f1da 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java @@ -35,13 +35,13 @@ import com.dtstack.chunjun.element.column.IntColumn; import com.dtstack.chunjun.element.column.LongColumn; import com.dtstack.chunjun.element.column.ShortColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; import com.dtstack.chunjun.element.column.StringColumn; import com.dtstack.chunjun.element.column.TimestampColumn; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.throwable.UnsupportedTypeException; import com.dtstack.chunjun.throwable.WriteRecordException; import com.dtstack.chunjun.util.ColumnTypeUtil; -import com.dtstack.chunjun.util.DateUtil; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -166,7 +166,10 @@ protected IDeserializationConverter createInternalConverter(String type) { TimestampColumn::new; case "DATE": return (IDeserializationConverter) - val -> new TimestampColumn(DateUtil.getTimestampFromStr(val)); + val -> + val == null + ? new SqlDateColumn(null) + : new SqlDateColumn(Date.valueOf(val)); case "BINARY": return (IDeserializationConverter) BytesColumn::new; case "ARRAY": diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java index 47414baae0..464c86c48c 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java @@ -194,15 +194,10 @@ protected IDeserializationConverter createInternalConverter(String type) { }; case "DATE": return (IDeserializationConverter) - val -> { - Timestamp timestamp = DateUtil.getTimestampFromStr(val); - if (timestamp == null) { - return new SqlDateColumn(null); - } else { - return new SqlDateColumn( - Date.valueOf(timestamp.toLocalDateTime().toLocalDate())); - } - }; + val -> + val == null + ? new SqlDateColumn(null) + : new SqlDateColumn(Date.valueOf(val)); case "BINARY": case "ARRAY": case "MAP": diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java index 4f4190721c..3fff3f3595 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java @@ -21,6 +21,7 @@ import com.dtstack.chunjun.config.CommonConfig; import com.dtstack.chunjun.config.FieldConfig; import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.column.NullColumn; import com.dtstack.chunjun.element.column.StringColumn; import com.dtstack.chunjun.enums.ColumnType; import com.dtstack.chunjun.throwable.ChunJunException; @@ -93,7 +94,7 @@ public AbstractRowConverter(int converterSize, CommonConfig commonConfig) { protected IDeserializationConverter wrapIntoNullableInternalConverter( IDeserializationConverter IDeserializationConverter) { return val -> { - if (val == null) { + if (val == null || val instanceof NullColumn) { return null; } else { try {