diff --git a/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/table/SqlserverDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/table/SqlserverDynamicTableFactory.java index b7e2fba584..a64c8498fb 100644 --- a/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/table/SqlserverDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/table/SqlserverDynamicTableFactory.java @@ -28,7 +28,9 @@ import com.dtstack.chunjun.connector.sqlserver.sink.SqlserverOutputFormat; import com.dtstack.chunjun.connector.sqlserver.source.SqlserverInputFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.commons.lang3.StringUtils; @@ -38,6 +40,8 @@ public class SqlserverDynamicTableFactory extends JdbcDynamicTableFactory { private static final String IDENTIFIER = "sqlserver-x"; + private JdbcConfig jdbcConfig; + @Override public String factoryIdentifier() { return IDENTIFIER; @@ -45,6 +49,11 @@ public String factoryIdentifier() { @Override protected JdbcDialect getDialect() { + if (jdbcConfig != null) { + return new SqlserverDialect( + jdbcConfig.isWithNoLock(), + jdbcConfig.getJdbcUrl().startsWith("jdbc:jtds:sqlserver")); + } return new SqlserverDialect(); } @@ -60,11 +69,24 @@ protected JdbcOutputFormatBuilder getOutputFormatBuilder() { @Override public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + this.jdbcConfig = getSourceConnectionConfig(helper.getOptions()); Map prop = context.getCatalogTable().getOptions(); prop.put("druid.validation-query", "SELECT 1"); return super.createDynamicTableSource(context); } + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + this.jdbcConfig = + getSinkConnectionConfig( + helper.getOptions(), context.getCatalogTable().getResolvedSchema()); + return super.createDynamicTableSink(context); + } + /** table字段有可能是[schema].[table]格式 需要转换为对应的schema 和 table 字段* */ @Override protected void resetTableInfo(JdbcConfig jdbcConfig) {