Skip to content

Commit

Permalink
[FLINK-35638] Refactor OceanBase test cases and remove dependency on …
Browse files Browse the repository at this point in the history
…host network
  • Loading branch information
whhe committed Jun 27, 2024
1 parent 0723009 commit bedd253
Show file tree
Hide file tree
Showing 9 changed files with 584 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.flink.cdc.connectors.oceanbase;

import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.ClassRule;
import org.junit.Rule;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
Expand All @@ -43,51 +41,13 @@
import static org.junit.Assert.assertTrue;

/** Basic class for testing OceanBase source. */
public abstract class OceanBaseTestBase extends TestLogger {
public abstract class OceanBaseTestBase extends AbstractTestBase {

private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

protected static final int DEFAULT_PARALLELISM = 4;

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());

@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

protected final String compatibleMode;
protected final String username;
protected final String password;
protected final String hostname;
protected final int port;
protected final String logProxyHost;
protected final int logProxyPort;
protected final String tenant;

public OceanBaseTestBase(
String compatibleMode,
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant) {
this.compatibleMode = compatibleMode;
this.username = username;
this.password = password;
this.hostname = hostname;
this.port = port;
this.logProxyHost = logProxyHost;
this.logProxyPort = logProxyPort;
this.tenant = tenant;
}
protected abstract OceanBaseCdcMetadata metadata();

protected String commonOptionsString() {
return String.format(
Expand All @@ -96,8 +56,14 @@ protected String commonOptionsString() {
+ " 'password' = '%s', "
+ " 'hostname' = '%s', "
+ " 'port' = '%s', "
+ " 'compatible-mode' = '%s'",
username, password, hostname, port, compatibleMode);
+ " 'compatible-mode' = '%s', "
+ " 'jdbc.driver' = '%s'",
metadata().getUsername(),
metadata().getPassword(),
metadata().getHostname(),
metadata().getPort(),
metadata().getCompatibleMode(),
metadata().getDriverClass());
}

protected String logProxyOptionsString() {
Expand All @@ -106,7 +72,9 @@ protected String logProxyOptionsString() {
+ " 'tenant-name' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'",
tenant, logProxyHost, logProxyPort);
metadata().getTenantName(),
metadata().getLogProxyHost(),
metadata().getLogProxyPort());
}

protected String initialOptionsString() {
Expand All @@ -120,7 +88,10 @@ protected String snapshotOptionsString() {
return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString();
}

protected abstract Connection getJdbcConnection() throws SQLException;
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
metadata().getJdbcUrl(), metadata().getUsername(), metadata().getPassword());
}

protected void setGlobalTimeZone(String serverTimeZone) throws SQLException {
try (Connection connection = getJdbcConnection();
Expand All @@ -130,7 +101,8 @@ protected void setGlobalTimeZone(String serverTimeZone) throws SQLException {
}

protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, sqlFile);
final String ddlFile =
String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(), sqlFile);
final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.oceanbase;

import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;

/** Utils to help test. */
@SuppressWarnings("resource")
public class OceanBaseTestUtils {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestUtils.class);

public static final Network NETWORK = Network.newNetwork();

private static final String SUPPORTED_OB_VERSION = "4.2.1.6-106000012024042515";

private static final String SYS_PASSWORD = "123456";
private static final String TEST_PASSWORD = "654321";

public static OceanBaseContainer createOceanBaseContainer(String initSqlFile) {
return createOceanBaseContainer("mini", initSqlFile);
}

public static OceanBaseContainer createOceanBaseContainer(String mode, String initSqlFile) {
return new OceanBaseContainer(SUPPORTED_OB_VERSION)
.withNetwork(NETWORK)
.withMode(mode)
.withSysPassword(SYS_PASSWORD)
.withTenantPassword(TEST_PASSWORD)
.withSetupSQL(initSqlFile)
.withStartupTimeout(Duration.ofMinutes(4))
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public static LogProxyContainer createLogProxyContainer() {
return new LogProxyContainer()
.withNetwork(NETWORK)
.withSysPassword(SYS_PASSWORD)
.withStartupTimeout(Duration.ofMinutes(1))
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public static String queryRootServiceList(Connection connection) {
try (Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'rootservice_list'");
return rs.next() ? rs.getString("VALUE") : null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public static String queryConfigUrl(Connection connection) {
try (Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'obconfig_url'");
return rs.next() ? rs.getString("VALUE") : null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package org.apache.flink.cdc.connectors.oceanbase.table;

import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils;
import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
Expand All @@ -27,31 +32,22 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

/** Integration tests for OceanBase MySQL mode table source. */
@RunWith(Parameterized.class)
public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class);
Expand All @@ -62,38 +58,34 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

private static final String NETWORK_MODE = "host";
private static final String OB_SYS_PASSWORD = "123456";

@ClassRule
public static final GenericContainer<?> OB_SERVER =
new GenericContainer<>("oceanbase/oceanbase-ce:4.2.0.0")
.withNetworkMode(NETWORK_MODE)
.withEnv("MODE", "slim")
.withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD)
.withEnv("OB_DATAFILE_SIZE", "1G")
.withEnv("OB_LOG_DISK_SIZE", "4G")
.withCopyFileToContainer(
MountableFile.forClasspathResource("ddl/mysql/docker_init.sql"),
"/root/boot/init.d/init.sql")
.waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
.withStartupTimeout(Duration.ofMinutes(4))
.withLogConsumer(new Slf4jLogConsumer(LOG));

@ClassRule
public static final GenericContainer<?> LOG_PROXY =
new GenericContainer<>("whhe/oblogproxy:1.1.3_4x")
.withNetworkMode(NETWORK_MODE)
.withEnv("OB_SYS_PASSWORD", OB_SYS_PASSWORD)
.waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
.withStartupTimeout(Duration.ofMinutes(1))
.withLogConsumer(new Slf4jLogConsumer(LOG));
private static final OceanBaseContainer OB_SERVER =
OceanBaseTestUtils.createOceanBaseContainer("ddl/mysql/docker_init.sql");

private static final LogProxyContainer LOG_PROXY = OceanBaseTestUtils.createLogProxyContainer();

private static final OceanBaseCdcMetadata METADATA =
new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);

private static String rsList;

@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(OB_SERVER, LOG_PROXY)).join();
LOG.info("Containers are started.");

try (Connection connection =
DriverManager.getConnection(
METADATA.getJdbcUrl(), METADATA.getUsername(), METADATA.getPassword())) {
rsList = OceanBaseTestUtils.queryRootServiceList(connection);
} catch (SQLException e) {
throw new RuntimeException(e);
}

if (rsList == null) {
throw new RuntimeException("rootservice_list not found");
}
LOG.info("Got 'rootservice_list': {}", rsList);
}

@AfterClass
Expand All @@ -103,56 +95,25 @@ public static void stopContainers() {
LOG.info("Containers are stopped.");
}

@Override
protected OceanBaseCdcMetadata metadata() {
return METADATA;
}

@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
}

private final String rsList;

public OceanBaseMySQLModeITCase(
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant,
String rsList) {
super("mysql", username, password, hostname, port, logProxyHost, logProxyPort, tenant);
this.rsList = rsList;
}

@Parameterized.Parameters
public static List<Object[]> parameters() {
return Collections.singletonList(
new Object[] {
"root@test",
"123456",
"127.0.0.1",
2881,
"127.0.0.1",
2983,
"test",
"127.0.0.1:2882:2881"
});
}

@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
+ " , "
+ String.format(" 'rootserver-list' = '%s'", rsList);
}

@Override
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
"jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false", username, password);
}

@Test
public void testTableList() throws Exception {
initializeTable("inventory");
Expand Down Expand Up @@ -312,6 +273,8 @@ public void testMetadataColumns() throws Exception {

waitForSinkSize("sink", snapshotSize + 1);

String tenant = metadata().getTenantName();

List<String> expected =
Arrays.asList(
"+I("
Expand Down
Loading

0 comments on commit bedd253

Please sign in to comment.