Skip to content

Commit

Permalink
Merge branch 'master' into hotfix_1821
Browse files Browse the repository at this point in the history
  • Loading branch information
ll076110 authored Oct 18, 2023
2 parents 70c00af + e5c0066 commit 9b075a0
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import com.dtstack.chunjun.element.column.IntColumn;
import com.dtstack.chunjun.element.column.LongColumn;
import com.dtstack.chunjun.element.column.ShortColumn;
import com.dtstack.chunjun.element.column.SqlDateColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
import com.dtstack.chunjun.util.ColumnTypeUtil;
import com.dtstack.chunjun.util.DateUtil;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -166,7 +166,10 @@ protected IDeserializationConverter createInternalConverter(String type) {
TimestampColumn::new;
case "DATE":
return (IDeserializationConverter<String, AbstractBaseColumn>)
val -> new TimestampColumn(DateUtil.getTimestampFromStr(val));
val ->
val == null
? new SqlDateColumn(null)
: new SqlDateColumn(Date.valueOf(val));
case "BINARY":
return (IDeserializationConverter<byte[], AbstractBaseColumn>) BytesColumn::new;
case "ARRAY":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,10 @@ protected IDeserializationConverter createInternalConverter(String type) {
};
case "DATE":
return (IDeserializationConverter<String, AbstractBaseColumn>)
val -> {
Timestamp timestamp = DateUtil.getTimestampFromStr(val);
if (timestamp == null) {
return new SqlDateColumn(null);
} else {
return new SqlDateColumn(
Date.valueOf(timestamp.toLocalDateTime().toLocalDate()));
}
};
val ->
val == null
? new SqlDateColumn(null)
: new SqlDateColumn(Date.valueOf(val));
case "BINARY":
case "ARRAY":
case "MAP":
Expand Down
41 changes: 41 additions & 0 deletions chunjun-connectors/chunjun-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,47 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>ch.qos.logback:*</exclude>
<exclude>ch.qos.reload4j:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>log4j:log4j</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.dtstack.chunjun.connector.kafka.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.SCANNER_BATCH_SIZE_BYTES;
import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.TABLE_NAME;
import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.WORKER_COUNT;
import static com.dtstack.chunjun.connector.kudu.table.KuduOptions.WRITE_MODE;
import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT;
import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_MAX_ROWS;
import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_PERIOD;
Expand Down Expand Up @@ -146,6 +147,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
optionalOptions.add(SINK_PARALLELISM);
optionalOptions.add(WRITE_MODE);

// kerberos
optionalOptions.add(PRINCIPAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,17 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) {
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case DATE:
return val -> Long.valueOf(((Timestamp) val).getTime() / 1000).intValue();
return val ->
val instanceof Timestamp
? (int)
(((Timestamp) val)
.toLocalDateTime()
.toLocalDate()
.toEpochDay())
: (int)
((Date.valueOf(String.valueOf(val)))
.toLocalDate()
.toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val ->
(int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.element.AbstractBaseColumn;
import com.dtstack.chunjun.element.column.NullColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.enums.ColumnType;
import com.dtstack.chunjun.throwable.ChunJunException;
Expand Down Expand Up @@ -93,7 +94,7 @@ public AbstractRowConverter(int converterSize, CommonConfig commonConfig) {
protected IDeserializationConverter wrapIntoNullableInternalConverter(
IDeserializationConverter IDeserializationConverter) {
return val -> {
if (val == null) {
if (val == null || val instanceof NullColumn) {
return null;
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public void collect(Object data, Throwable cause, String field, long globalError
entity.setFieldName(field);
entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause));

consumer.offer(entity, globalErrors);
errorCounter.add(1L);
consumer.offer(entity, globalErrors);
}

public String toString(Object data) {
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>

Expand Down

0 comments on commit 9b075a0

Please sign in to comment.