From de894719e6c657996c8e0c3c3aded423b2d60d3a Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Thu, 10 Oct 2024 20:17:46 +0800 Subject: [PATCH] [hotfix][tests] Fix oracle e2e test with ARM docker image without a database initial --- .../cdc/connectors/tests/OracleE2eITCase.java | 94 +++++++++++++++++-- .../test/resources/ddl/oracle_inventory.sql | 55 +++++++++++ 2 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oracle_inventory.sql diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java index f2a6e99798..1cecb57220 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java @@ -19,8 +19,10 @@ import org.apache.flink.cdc.common.test.utils.JdbcProxy; import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.oracle.source.OracleSourceITCase; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; +import io.debezium.relational.TableId; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,26 +34,36 @@ import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_PWD; -import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_USER; -import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_DATABASE; -import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_PWD; -import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_USER; +import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.*; +import static org.junit.Assert.assertNotNull; /** End-to-end tests for oracle-cdc connector uber jar. */ public class OracleE2eITCase extends FlinkContainerTestEnvironment { private static final Logger LOG = LoggerFactory.getLogger(OracleE2eITCase.class); + protected static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver"; private static final String INTER_CONTAINER_ORACLE_ALIAS = "oracle"; private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar"); @@ -87,6 +99,7 @@ public void before() { .withReuse(true); Startables.deepStart(Stream.of(oracle)).join(); + initializeOracleTable("oracle_inventory"); LOG.info("Containers are started."); } @@ -197,7 +210,76 @@ public void testOracleCDC() throws Exception { 300000L); } - private Connection getOracleJdbcConnection() throws SQLException { + private static Connection getOracleJdbcConnection() throws SQLException { return DriverManager.getConnection(oracle.getJdbcUrl(), TEST_USER, TEST_PWD); } + + private static void initializeOracleTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = OracleSourceITCase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + connection.setAutoCommit(true); + // region Drop all user tables in Debezium schema + listTables(connection) + .forEach( + tableId -> { + try { + statement.execute( + "DROP TABLE " + + String.join( + ".", + tableId.schema(), + tableId.table())); + } catch (SQLException e) { + LOG.warn("drop table error, table:{}", tableId, e); + } + }); + // endregion + + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (SQLException | IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + // ------------------ utils ----------------------- + protected static List listTables(Connection connection) { + + Set tableIdSet = new HashSet<>(); + String queryTablesSql = + "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n" + + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') " + + "AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)"; + try { + ResultSet resultSet = connection.createStatement().executeQuery(queryTablesSql); + while (resultSet.next()) { + String schemaName = resultSet.getString(1); + String tableName = resultSet.getString(2); + TableId tableId = new TableId(ORACLE_DATABASE, schemaName, tableName); + tableIdSet.add(tableId); + } + } catch (SQLException e) { + LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e); + } + return new ArrayList<>(tableIdSet); + } } diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oracle_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oracle_inventory.sql new file mode 100644 index 0000000000..ff928c4360 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oracle_inventory.sql @@ -0,0 +1,55 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: customer +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate products and category tables using a single insert with many rows +CREATE TABLE DEBEZIUM.PRODUCTS ( + ID NUMBER(9, 0) NOT NULL, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT, + PRIMARY KEY(ID) +); +CREATE TABLE DEBEZIUM.CATEGORY ( + ID NUMBER(9, 0) NOT NULL, + CATEGORY_NAME VARCHAR(255), + PRIMARY KEY(ID) +); + +ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; +ALTER TABLE DEBEZIUM.CATEGORY ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + + +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (101,'scooter','Small 2-wheel scooter',3.14); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (102,'car battery','12V car battery',8.1); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (104,'hammer','12oz carpenters hammer',0.75); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (105,'hammer','14oz carpenters hammer',0.875); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (106,'hammer','16oz carpenters hammer',1.0); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (107,'rocks','box of assorted rocks',5.3); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (108,'jacket','water resistent black wind breaker',0.1); +INSERT INTO DEBEZIUM.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (109,'spare tire','24 inch spare tire',22.2);