From bd216626e67d347029bc3955536c1ca8a7a8812b Mon Sep 17 00:00:00 2001 From: He Wang Date: Mon, 1 Apr 2024 16:47:39 +0800 Subject: [PATCH] update oblogclient and rollback tableWhiteList setting --- .../flink/cdc/connectors/oceanbase/OceanBaseSource.java | 6 +++--- .../oceanbase/source/OceanBaseRichSourceFunction.java | 6 ++++++ pom.xml | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java index 4a27c5fabec..3ba4aba0a0b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java @@ -25,7 +25,7 @@ import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.config.ObReaderConfig; -import com.oceanbase.clogproxy.client.util.ClientIdGenerator; +import com.oceanbase.clogproxy.client.util.ClientUtil; import org.apache.commons.lang3.StringUtils; import java.time.Duration; @@ -262,13 +262,14 @@ public SourceFunction build() { logProxyClientId = String.format( "%s_%s_%s", - ClientIdGenerator.generate(), + ClientUtil.generateClientId(), Thread.currentThread().getId(), tenantName); } clientConf = ClientConf.builder() .clientId(logProxyClientId) + .maxReconnectTimes(0) .connectTimeoutMs((int) connectTimeout.toMillis()) .build(); @@ -282,7 +283,6 @@ public SourceFunction build() { if (StringUtils.isNotEmpty(workingMode)) { obReaderConfig.setWorkingMode(workingMode); } - obReaderConfig.setTableWhiteList(tenantName + ".*.*"); obReaderConfig.setUsername(username); obReaderConfig.setPassword(password); obReaderConfig.setStartTimestamp(startupTimestamp); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java index d253b00a0c6..efd6ee51da5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -70,6 +70,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -274,6 +275,11 @@ private void initTableWhiteList() { LOG.info("Table list: {}", localTableSet); this.tableSet = localTableSet; + // for some 4.x versions, it will be treated as 'tenant.*.*' + this.obReaderConfig.setTableWhiteList( + localTableSet.stream() + .map(tableId -> String.format("%s.%s", tenantName, tableId.toString())) + .collect(Collectors.joining("|"))); } private TableSchema getTableSchema(TableId tableId) { diff --git a/pom.xml b/pom.xml index e75c4a931c3..19ed89b3f70 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ limitations under the License. 1.7.36 2.17.1 2.4.2 - 1.1.0 + 1.1.1 3.12.0 2.7.0 5.10.1