From 49c68737daf8b83811ce6537edb6e4a3a5f12368 Mon Sep 17 00:00:00 2001 From: david-gao1 <49160235+david-gao1@users.noreply.github.com> Date: Thu, 21 Sep 2023 16:16:38 +0800 Subject: [PATCH 1/6] [hotfix-#1823][connector][oracle] oracle Connector incorrectly consumed date-type data (#1824) Co-authored-by: gaoliang --- .../oracle/converter/OracleSqlConverter.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleSqlConverter.java b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleSqlConverter.java index 6d1748e6a2..a240235631 100644 --- a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleSqlConverter.java +++ b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleSqlConverter.java @@ -124,7 +124,17 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { new BigDecimal((BigInteger) val, 0), precision, scale) : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); case DATE: - return val -> Long.valueOf(((Timestamp) val).getTime() / 1000).intValue(); + return val -> + val instanceof Timestamp + ? (int) + (((Timestamp) val) + .toLocalDateTime() + .toLocalDate() + .toEpochDay()) + : (int) + ((Date.valueOf(String.valueOf(val))) + .toLocalDate() + .toEpochDay()); case TIME_WITHOUT_TIME_ZONE: return val -> (int) From 5109c2d2eb1c1614d1a792ff7fce646e37bd0e19 Mon Sep 17 00:00:00 2001 From: OT-QX <43375919+ll076110@users.noreply.github.com> Date: Mon, 25 Sep 2023 10:28:22 +0800 Subject: [PATCH 2/6] [hotfix-#1827][kudu]Fix flinksql kudu write mode parameter does not take effect (#1828) Co-authored-by: OT-XY --- .../chunjun/connector/kudu/table/KuduDynamicTableFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableFactory.java index 3d4308f4a0..856e052c66 100644 --- a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableFactory.java @@ -44,6 +44,7 @@ import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.SCANNER_BATCH_SIZE_BYTES; import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.TABLE_NAME; import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.WORKER_COUNT; +import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.WRITE_MODE; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_MAX_ROWS; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_PERIOD; @@ -146,6 +147,7 @@ public Set> optionalOptions() { optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL); optionalOptions.add(SINK_MAX_RETRIES); optionalOptions.add(SINK_PARALLELISM); + optionalOptions.add(WRITE_MODE); // kerberos optionalOptions.add(PRINCIPAL); From 8d105fc15cc076f75a66725e18ceb0540581c3f0 Mon Sep 17 00:00:00 2001 From: OT-QX <43375919+ll076110@users.noreply.github.com> Date: Mon, 25 Sep 2023 15:29:33 +0800 Subject: [PATCH 3/6] [hotfix-#1825][hdfs]Synchronizing text and parquet table data, the speed is too slow when the table field contains date type (#1826) Co-authored-by: OT-XY --- .../hdfs/converter/HdfsParquetSyncConverter.java | 7 +++++-- .../hdfs/converter/HdfsTextSyncConverter.java | 13 ++++--------- .../chunjun/converter/AbstractRowConverter.java | 3 ++- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java index 09f4f1d505..b33874f1da 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetSyncConverter.java @@ -35,13 +35,13 @@ import com.dtstack.chunjun.element.column.IntColumn; import com.dtstack.chunjun.element.column.LongColumn; import com.dtstack.chunjun.element.column.ShortColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; import com.dtstack.chunjun.element.column.StringColumn; import com.dtstack.chunjun.element.column.TimestampColumn; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.throwable.UnsupportedTypeException; import com.dtstack.chunjun.throwable.WriteRecordException; import com.dtstack.chunjun.util.ColumnTypeUtil; -import com.dtstack.chunjun.util.DateUtil; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -166,7 +166,10 @@ protected IDeserializationConverter createInternalConverter(String type) { TimestampColumn::new; case "DATE": return (IDeserializationConverter) - val -> new TimestampColumn(DateUtil.getTimestampFromStr(val)); + val -> + val == null + ? new SqlDateColumn(null) + : new SqlDateColumn(Date.valueOf(val)); case "BINARY": return (IDeserializationConverter) BytesColumn::new; case "ARRAY": diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java index 47414baae0..464c86c48c 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextSyncConverter.java @@ -194,15 +194,10 @@ protected IDeserializationConverter createInternalConverter(String type) { }; case "DATE": return (IDeserializationConverter) - val -> { - Timestamp timestamp = DateUtil.getTimestampFromStr(val); - if (timestamp == null) { - return new SqlDateColumn(null); - } else { - return new SqlDateColumn( - Date.valueOf(timestamp.toLocalDateTime().toLocalDate())); - } - }; + val -> + val == null + ? new SqlDateColumn(null) + : new SqlDateColumn(Date.valueOf(val)); case "BINARY": case "ARRAY": case "MAP": diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java index 4f4190721c..3fff3f3595 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java @@ -21,6 +21,7 @@ import com.dtstack.chunjun.config.CommonConfig; import com.dtstack.chunjun.config.FieldConfig; import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.column.NullColumn; import com.dtstack.chunjun.element.column.StringColumn; import com.dtstack.chunjun.enums.ColumnType; import com.dtstack.chunjun.throwable.ChunJunException; @@ -93,7 +94,7 @@ public AbstractRowConverter(int converterSize, CommonConfig commonConfig) { protected IDeserializationConverter wrapIntoNullableInternalConverter( IDeserializationConverter IDeserializationConverter) { return val -> { - if (val == null) { + if (val == null || val instanceof NullColumn) { return null; } else { try { From 0e6a442f498cc63f3468b3c6dc186b04c06b68c2 Mon Sep 17 00:00:00 2001 From: libailin Date: Thu, 28 Sep 2023 10:51:44 +0800 Subject: [PATCH 4/6] [hotfix-#1829][chunjun] Fixed view checkpoints error report --- pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index 8a708d95ce..5ce3ae4940 100755 --- a/pom.xml +++ b/pom.xml @@ -285,6 +285,10 @@ netty io.netty + + commons-math3 + org.apache.commons + From d7d2c5cb336c19307a28301d3b070896a45a08b5 Mon Sep 17 00:00:00 2001 From: Tang Date: Fri, 13 Oct 2023 10:55:19 +0800 Subject: [PATCH 5/6] [fix-#1832][dirty] Fixed Prometheus nErrors failing to synchronize last day failure records (#1833) Co-authored-by: tangdelong --- .../java/com/dtstack/chunjun/dirty/manager/DirtyManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/dirty/manager/DirtyManager.java b/chunjun-core/src/main/java/com/dtstack/chunjun/dirty/manager/DirtyManager.java index d217352dae..54456d129a 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/dirty/manager/DirtyManager.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/dirty/manager/DirtyManager.java @@ -131,8 +131,8 @@ public void collect(Object data, Throwable cause, String field, long globalError entity.setFieldName(field); entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause)); - consumer.offer(entity, globalErrors); errorCounter.add(1L); + consumer.offer(entity, globalErrors); } public String toString(Object data) { From e5c0066acf116c1bba89b1301735241698f4c547 Mon Sep 17 00:00:00 2001 From: libailin Date: Tue, 17 Oct 2023 13:49:13 +0800 Subject: [PATCH 6/6] [hotfix-#1834][connector][kafka] Fixed missing dependencies (#1835) --- .../chunjun-connector-kafka/pom.xml | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/chunjun-connectors/chunjun-connector-kafka/pom.xml b/chunjun-connectors/chunjun-connector-kafka/pom.xml index d8716bca56..48c2b7537c 100644 --- a/chunjun-connectors/chunjun-connector-kafka/pom.xml +++ b/chunjun-connectors/chunjun-connector-kafka/pom.xml @@ -89,6 +89,47 @@ + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + false + + + org.slf4j:* + org.apache.logging.log4j:* + ch.qos.logback:* + ch.qos.reload4j:* + commons-logging:* + log4j:log4j + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.google.common + com.dtstack.chunjun.connector.kafka.com.google.common + + + + + + org.apache.maven.plugins maven-antrun-plugin