diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/options/JdbcSinkOptions.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/options/JdbcSinkOptions.java index f5e7d1ce8a..6e1c009181 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/options/JdbcSinkOptions.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/options/JdbcSinkOptions.java @@ -18,6 +18,8 @@ package com.dtstack.chunjun.connector.jdbc.options; +import com.dtstack.chunjun.enums.EWriteMode; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -52,4 +54,10 @@ public class JdbcSinkOptions { .stringType() .noDefaultValue() .withDescription("sink.post-sql"); + + public static final ConfigOption WRITE_MODE = + ConfigOptions.key("write-mode") + .stringType() + .defaultValue(EWriteMode.INSERT.name()) + .withDescription("write mode"); } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcDynamicTableSink.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcDynamicTableSink.java index 030d91c1f0..472b922dd7 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcDynamicTableSink.java @@ -22,7 +22,6 @@ import com.dtstack.chunjun.config.TypeConfig; import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; -import com.dtstack.chunjun.enums.EWriteMode; import com.dtstack.chunjun.sink.DtOutputFormatSinkFunction; import org.apache.flink.table.catalog.Column; @@ -108,10 +107,6 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { }); jdbcConfig.setColumn(columnList); - jdbcConfig.setMode( - (CollectionUtil.isNullOrEmpty(jdbcConfig.getUniqueKey())) - ? EWriteMode.INSERT.name() - : EWriteMode.UPDATE.name()); builder.setColumnNameList(columnNameList); builder.setColumnTypeList(columnTypeList); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java index 938a66b45b..d40b489136 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java @@ -44,6 +44,7 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CollectionUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -52,8 +53,10 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -103,6 +106,23 @@ protected void openInternal(int taskNumber, int numTasks) { log.info("updateKey = {}", JsonUtil.toJson(tableIndex)); } } + if (!CollectionUtil.isNullOrEmpty(jdbcConfig.getUniqueKey())) { + final RowType rowType = rowConverter.getRowType(); + List fields = rowType.getFields(); + List keyRowFields = + new ArrayList<>(jdbcConfig.getUniqueKey().size()); + for (String name : jdbcConfig.getUniqueKey()) { + for (RowType.RowField field : fields) { + if (Objects.equals(name, field.getName())) { + keyRowFields.add(field); + break; + } + } + } + RowType keyRowType = new RowType(keyRowFields); + setKeyRowType(keyRowType); + setKeyRowConverter(jdbcDialect.getRowConverter(keyRowType)); + } buildStatementWrapper(); log.info("subTask[{}}] wait finished", taskNumber); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java index 12e16ff88e..7d7f360827 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -69,6 +69,7 @@ import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_POST_SQL; import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_PRE_SQL; import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_SEMANTIC; +import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.WRITE_MODE; import static com.dtstack.chunjun.connector.jdbc.options.JdbcSourceOptions.SCAN_CUSTOM_SQL; import static com.dtstack.chunjun.connector.jdbc.options.JdbcSourceOptions.SCAN_WHERE; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT; @@ -171,6 +172,7 @@ protected JdbcConfig getSinkConnectionConfig( jdbcConfig.setFlushIntervalMills(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL)); jdbcConfig.setParallelism(readableConfig.get(SINK_PARALLELISM)); jdbcConfig.setSemantic(readableConfig.get(SINK_SEMANTIC)); + jdbcConfig.setMode(readableConfig.get(WRITE_MODE)); if (StringUtils.isNotEmpty(readableConfig.get(SINK_PRE_SQL))) { jdbcConfig.setPreSql(Arrays.asList(readableConfig.get(SINK_PRE_SQL).split(";"))); @@ -180,7 +182,9 @@ protected JdbcConfig getSinkConnectionConfig( } List keyFields = new ArrayList<>(); - schema.getPrimaryKey().ifPresent(item -> keyFields.add(item.getName())); + if (schema.getPrimaryKey().isPresent()) { + keyFields = schema.getPrimaryKey().get().getColumns(); + } jdbcConfig.setUniqueKey(keyFields); resetTableInfo(jdbcConfig); @@ -307,6 +311,7 @@ public Set> optionalOptions() { optionalOptions.add(SINK_SEMANTIC); optionalOptions.add(SINK_PRE_SQL); optionalOptions.add(SINK_POST_SQL); + optionalOptions.add(WRITE_MODE); return optionalOptions; }