Skip to content

Commit

Permalink
[FLINK-34639][cdc][oceanbase] Support debezium deserializer in OceanB…
Browse files Browse the repository at this point in the history
…ase source connector (apache#3124)
  • Loading branch information
whhe authored Apr 25, 2024
1 parent bdca0e3 commit 05281e5
Show file tree
Hide file tree
Showing 34 changed files with 2,656 additions and 1,762 deletions.
265 changes: 165 additions & 100 deletions docs/content.zh/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md

Large diffs are not rendered by default.

207 changes: 131 additions & 76 deletions docs/content/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ limitations under the License.
<version>${revision}</version>
</parent>
<properties>
<!-- Because of oceanbase docker image can not expose port quickly, so we need to specify testcontainers version to 1.15.3 -->
<jdbc.version>1.15.3</jdbc.version>
<oblogclient.version>1.1.2</oblogclient.version>
</properties>
<modelVersion>4.0.0</modelVersion>

Expand All @@ -34,6 +33,13 @@ limitations under the License.
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Debezium dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -47,12 +53,6 @@ limitations under the License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- OceanBase Log Client -->
<dependency>
<groupId>com.oceanbase</groupId>
Expand All @@ -64,7 +64,13 @@ limitations under the License.
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<version>8.0.27</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- test dependencies on Flink -->
Expand Down Expand Up @@ -151,7 +157,7 @@ limitations under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>${jdbc.version}</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package org.apache.flink.cdc.connectors.oceanbase;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction;
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import org.apache.flink.cdc.connectors.oceanbase.table.StartupMode;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand All @@ -47,7 +49,7 @@ public static <T> Builder<T> builder() {
public static class Builder<T> {

// common config
private StartupMode startupMode;
private StartupOptions startupOptions;
private String username;
private String password;
private String tenantName;
Expand All @@ -73,11 +75,12 @@ public static class Builder<T> {
private String configUrl;
private String workingMode;
private Properties obcdcProperties;
private Properties debeziumProperties;

private OceanBaseDeserializationSchema<T> deserializer;
private DebeziumDeserializationSchema<T> deserializer;

public Builder<T> startupMode(StartupMode startupMode) {
this.startupMode = startupMode;
public Builder<T> startupOptions(StartupOptions startupOptions) {
this.startupOptions = startupOptions;
return this;
}

Expand Down Expand Up @@ -151,7 +154,7 @@ public Builder<T> logProxyHost(String logProxyHost) {
return this;
}

public Builder<T> logProxyPort(int logProxyPort) {
public Builder<T> logProxyPort(Integer logProxyPort) {
this.logProxyPort = logProxyPort;
return this;
}
Expand Down Expand Up @@ -186,23 +189,44 @@ public Builder<T> obcdcProperties(Properties obcdcProperties) {
return this;
}

public Builder<T> deserializer(OceanBaseDeserializationSchema<T> deserializer) {
public Builder<T> debeziumProperties(Properties debeziumProperties) {
this.debeziumProperties = debeziumProperties;
return this;
}

public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}

public SourceFunction<T> build() {
switch (startupMode) {
case INITIAL:
checkNotNull(hostname, "hostname shouldn't be null on startup mode 'initial'");
checkNotNull(port, "port shouldn't be null on startup mode 'initial'");
checkNotNull(
compatibleMode,
"compatibleMode shouldn't be null on startup mode 'initial'");
checkNotNull(
jdbcDriver, "jdbcDriver shouldn't be null on startup mode 'initial'");
startupTimestamp = 0L;
checkNotNull(username, "username shouldn't be null");
checkNotNull(password, "password shouldn't be null");
checkNotNull(hostname, "hostname shouldn't be null");
checkNotNull(port, "port shouldn't be null");

if (startupOptions == null) {
startupOptions = StartupOptions.initial();
}
if (compatibleMode == null) {
compatibleMode = "mysql";
}
if (jdbcDriver == null) {
jdbcDriver = "com.mysql.cj.jdbc.Driver";
}

if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}

if (serverTimeZone == null) {
serverTimeZone = ZoneId.systemDefault().getId();
}

switch (startupOptions.startupMode) {
case SNAPSHOT:
break;
case INITIAL:
case LATEST_OFFSET:
startupTimestamp = 0L;
break;
Expand All @@ -213,15 +237,9 @@ public SourceFunction<T> build() {
break;
default:
throw new UnsupportedOperationException(
startupMode + " mode is not supported.");
startupOptions.startupMode + " mode is not supported.");
}

if (!startupMode.equals(StartupMode.INITIAL)
&& (StringUtils.isNotEmpty(databaseName)
|| StringUtils.isNotEmpty(tableName))) {
throw new IllegalArgumentException(
"If startup mode is not 'INITIAL', 'database-name' and 'table-name' must not be configured");
}
if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(
Expand All @@ -233,57 +251,51 @@ public SourceFunction<T> build() {
"'database-name', 'table-name' or 'table-list' should be configured");
}

if (serverTimeZone == null) {
serverTimeZone = "+00:00";
}
ClientConf clientConf = null;
ObReaderConfig obReaderConfig = null;

if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}
if (!startupOptions.isSnapshotOnly()) {

if (logProxyClientId == null) {
logProxyClientId =
String.format(
"%s_%s_%s",
ClientIdGenerator.generate(),
Thread.currentThread().getId(),
checkNotNull(tenantName));
}
ClientConf clientConf =
ClientConf.builder()
.clientId(logProxyClientId)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();

ObReaderConfig obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
checkNotNull(logProxyHost);
checkNotNull(logProxyPort);
checkNotNull(tenantName);

obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(
DateTimeFormatter.ofPattern("xxx")
.format(
ZoneId.of(serverTimeZone)
.getRules()
.getOffset(Instant.now())));

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
}
}

return new OceanBaseRichSourceFunction<>(
StartupMode.INITIAL.equals(startupMode),
startupOptions,
username,
password,
tenantName,
databaseName,
tableName,
tableList,
serverTimeZone,
connectTimeout,
hostname,
port,
Expand All @@ -292,8 +304,9 @@ public SourceFunction<T> build() {
jdbcProperties,
logProxyHost,
logProxyPort,
clientConf,
logProxyClientId,
obReaderConfig,
debeziumProperties,
deserializer);
}
}
Expand Down
Loading

0 comments on commit 05281e5

Please sign in to comment.