Skip to content

Commit

Permalink
update oblogclient and rollback tableWhiteList setting
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Apr 1, 2024
1 parent 0630819 commit bd21662
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.client.util.ClientUtil;
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
Expand Down Expand Up @@ -262,13 +262,14 @@ public SourceFunction<T> build() {
logProxyClientId =
String.format(
"%s_%s_%s",
ClientIdGenerator.generate(),
ClientUtil.generateClientId(),
Thread.currentThread().getId(),
tenantName);
}
clientConf =
ClientConf.builder()
.clientId(logProxyClientId)
.maxReconnectTimes(0)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();

Expand All @@ -282,7 +283,6 @@ public SourceFunction<T> build() {
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setTableWhiteList(tenantName + ".*.*");
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -274,6 +275,11 @@ private void initTableWhiteList() {

LOG.info("Table list: {}", localTableSet);
this.tableSet = localTableSet;
// for some 4.x versions, it will be treated as 'tenant.*.*'
this.obReaderConfig.setTableWhiteList(
localTableSet.stream()
.map(tableId -> String.format("%s.%s", tenantName, tableId.toString()))
.collect(Collectors.joining("|")));
}

private TableSchema getTableSchema(TableId tableId) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ limitations under the License.
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version>
<oblogclient.version>1.1.0</oblogclient.version>
<oblogclient.version>1.1.1</oblogclient.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<json-path.version>2.7.0</json-path.version>
<junit5.version>5.10.1</junit5.version>
Expand Down

0 comments on commit bd21662

Please sign in to comment.