Skip to content

Commit

Permalink
[hotfix-#1799][connector][jdbc] Fix related issues when the write mod…
Browse files Browse the repository at this point in the history
…e is replace/update
  • Loading branch information
libailin authored and lihongwei committed Sep 5, 2023
1 parent b1f79d6 commit 804d200
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.dtstack.chunjun.connector.jdbc.options;

import com.dtstack.chunjun.enums.EWriteMode;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

Expand Down Expand Up @@ -52,4 +54,10 @@ public class JdbcSinkOptions {
.stringType()
.noDefaultValue()
.withDescription("sink.post-sql");

public static final ConfigOption<String> WRITE_MODE =
ConfigOptions.key("write-mode")
.stringType()
.defaultValue(EWriteMode.INSERT.name())
.withDescription("write mode");
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.enums.EWriteMode;
import com.dtstack.chunjun.sink.DtOutputFormatSinkFunction;

import org.apache.flink.table.catalog.Column;
Expand Down Expand Up @@ -108,10 +107,6 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
});

jdbcConfig.setColumn(columnList);
jdbcConfig.setMode(
(CollectionUtil.isNullOrEmpty(jdbcConfig.getUniqueKey()))
? EWriteMode.INSERT.name()
: EWriteMode.UPDATE.name());

builder.setColumnNameList(columnNameList);
builder.setColumnTypeList(columnTypeList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -52,8 +53,10 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -103,6 +106,23 @@ protected void openInternal(int taskNumber, int numTasks) {
log.info("updateKey = {}", JsonUtil.toJson(tableIndex));
}
}
if (!CollectionUtil.isNullOrEmpty(jdbcConfig.getUniqueKey())) {
final RowType rowType = rowConverter.getRowType();
List<RowType.RowField> fields = rowType.getFields();
List<RowType.RowField> keyRowFields =
new ArrayList<>(jdbcConfig.getUniqueKey().size());
for (String name : jdbcConfig.getUniqueKey()) {
for (RowType.RowField field : fields) {
if (Objects.equals(name, field.getName())) {
keyRowFields.add(field);
break;
}
}
}
RowType keyRowType = new RowType(keyRowFields);
setKeyRowType(keyRowType);
setKeyRowConverter(jdbcDialect.getRowConverter(keyRowType));
}

buildStatementWrapper();
log.info("subTask[{}}] wait finished", taskNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_POST_SQL;
import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_PRE_SQL;
import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_SEMANTIC;
import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.WRITE_MODE;
import static com.dtstack.chunjun.connector.jdbc.options.JdbcSourceOptions.SCAN_CUSTOM_SQL;
import static com.dtstack.chunjun.connector.jdbc.options.JdbcSourceOptions.SCAN_WHERE;
import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT;
Expand Down Expand Up @@ -171,6 +172,7 @@ protected JdbcConfig getSinkConnectionConfig(
jdbcConfig.setFlushIntervalMills(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL));
jdbcConfig.setParallelism(readableConfig.get(SINK_PARALLELISM));
jdbcConfig.setSemantic(readableConfig.get(SINK_SEMANTIC));
jdbcConfig.setMode(readableConfig.get(WRITE_MODE));

if (StringUtils.isNotEmpty(readableConfig.get(SINK_PRE_SQL))) {
jdbcConfig.setPreSql(Arrays.asList(readableConfig.get(SINK_PRE_SQL).split(";")));
Expand All @@ -180,7 +182,9 @@ protected JdbcConfig getSinkConnectionConfig(
}

List<String> keyFields = new ArrayList<>();
schema.getPrimaryKey().ifPresent(item -> keyFields.add(item.getName()));
if (schema.getPrimaryKey().isPresent()) {
keyFields = schema.getPrimaryKey().get().getColumns();
}

jdbcConfig.setUniqueKey(keyFields);
resetTableInfo(jdbcConfig);
Expand Down Expand Up @@ -307,6 +311,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(SINK_SEMANTIC);
optionalOptions.add(SINK_PRE_SQL);
optionalOptions.add(SINK_POST_SQL);
optionalOptions.add(WRITE_MODE);
return optionalOptions;
}

Expand Down

0 comments on commit 804d200

Please sign in to comment.