Skip to content

Commit

Permalink
[hotfix][tests] Fix oracle e2e test with ARM docker image without a d…
Browse files Browse the repository at this point in the history
…atabase initial
  • Loading branch information
GOODBOY008 committed Oct 10, 2024
1 parent 4b13c49 commit de89471
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.oracle.source.OracleSourceITCase;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;

import io.debezium.relational.TableId;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -32,26 +34,36 @@
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_PWD;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_USER;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_DATABASE;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_PWD;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_USER;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.*;
import static org.junit.Assert.assertNotNull;

/** End-to-end tests for oracle-cdc connector uber jar. */
public class OracleE2eITCase extends FlinkContainerTestEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(OracleE2eITCase.class);
protected static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver";
private static final String INTER_CONTAINER_ORACLE_ALIAS = "oracle";
private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar");
Expand Down Expand Up @@ -87,6 +99,7 @@ public void before() {
.withReuse(true);

Startables.deepStart(Stream.of(oracle)).join();
initializeOracleTable("oracle_inventory");
LOG.info("Containers are started.");
}

Expand Down Expand Up @@ -197,7 +210,76 @@ public void testOracleCDC() throws Exception {
300000L);
}

private Connection getOracleJdbcConnection() throws SQLException {
private static Connection getOracleJdbcConnection() throws SQLException {
return DriverManager.getConnection(oracle.getJdbcUrl(), TEST_USER, TEST_PWD);
}

private static void initializeOracleTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = OracleSourceITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
connection.setAutoCommit(true);
// region Drop all user tables in Debezium schema
listTables(connection)
.forEach(
tableId -> {
try {
statement.execute(
"DROP TABLE "
+ String.join(
".",
tableId.schema(),
tableId.table()));
} catch (SQLException e) {
LOG.warn("drop table error, table:{}", tableId, e);
}
});
// endregion

final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());

for (String stmt : statements) {
statement.execute(stmt);
}
} catch (SQLException | IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}

// ------------------ utils -----------------------
protected static List<TableId> listTables(Connection connection) {

Set<TableId> tableIdSet = new HashSet<>();
String queryTablesSql =
"SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
+ "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') "
+ "AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";
try {
ResultSet resultSet = connection.createStatement().executeQuery(queryTablesSql);
while (resultSet.next()) {
String schemaName = resultSet.getString(1);
String tableName = resultSet.getString(2);
TableId tableId = new TableId(ORACLE_DATABASE, schemaName, tableName);
tableIdSet.add(tableId);
}
} catch (SQLException e) {
LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e);
}
return new ArrayList<>(tableIdSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: customer
-- ----------------------------------------------------------------------------------------------------------------

-- Create and populate products and category tables using a single insert with many rows
CREATE TABLE DEBEZIUM.PRODUCTS (
ID NUMBER(9, 0) NOT NULL,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT,
PRIMARY KEY(ID)
);
CREATE TABLE DEBEZIUM.CATEGORY (
ID NUMBER(9, 0) NOT NULL,
CATEGORY_NAME VARCHAR(255),
PRIMARY KEY(ID)
);

ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE DEBEZIUM.CATEGORY ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;


INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (101,'scooter','Small 2-wheel scooter',3.14);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (102,'car battery','12V car battery',8.1);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (104,'hammer','12oz carpenters hammer',0.75);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (105,'hammer','14oz carpenters hammer',0.875);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (106,'hammer','16oz carpenters hammer',1.0);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (107,'rocks','box of assorted rocks',5.3);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (108,'jacket','water resistent black wind breaker',0.1);
INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (109,'spare tire','24 inch spare tire',22.2);

0 comments on commit de89471

Please sign in to comment.