diff --git a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java index 02d2045e48c32..d8160a7df66b2 100644 --- a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -591,7 +591,6 @@ void testDiscover() throws Exception { .collect(Collectors.toList())); } - @Test public void newTableSnapshotTest() throws Exception { final AutoCloseableIterator firstBatchIterator = getSource() .read(getConfig(), CONFIGURED_CATALOG, null); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index 905cb3681d85e..693e2a93a59fb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -78,24 +78,8 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html switch (columnType) { - case BIT -> { - if (field.getLength() == 1L) { - // BIT(1) is boolean - putBoolean(json, columnName, resultSet, colIndex); - } else { - putBinary(json, columnName, resultSet, colIndex); - } - } case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); - case TINYINT -> { - if (field.getLength() == 1L) { - // TINYINT(1) is boolean - putBoolean(json, columnName, resultSet, colIndex); - } else { - putShortInt(json, columnName, resultSet, colIndex); - } - } - case TINYINT_UNSIGNED, YEAR -> putShortInt(json, columnName, resultSet, colIndex); + case BIT, TINYINT, TINYINT_UNSIGNED, YEAR -> putShortInt(json, columnName, resultSet, colIndex); case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex); case INT, INT_UNSIGNED -> { if (field.isUnsigned()) { @@ -172,7 +156,7 @@ public MysqlType getDatabaseFieldType(final JsonNode field) { // BIT(1) and TINYINT(1) are interpreted as boolean case BIT, TINYINT -> { if (columnSize == 1) { - return MysqlType.BOOLEAN; + return SMALLINT; } } case YEAR -> { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 9f9f00c530044..2ce49b06f6aed 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -1,294 +1,294 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql; - -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; -import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE; -import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS; -import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; -import io.airbyte.db.Database; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.jdbc.DefaultJdbcDatabase; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.Source; -import io.airbyte.integrations.debezium.CdcSourceTest; -import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import java.sql.SQLException; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import javax.sql.DataSource; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.containers.MySQLContainer; -import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; -import uk.org.webcompere.systemstubs.jupiter.SystemStub; -import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; - -@ExtendWith(SystemStubsExtension.class) -public class CdcMysqlSourceTest extends CdcSourceTest { - - @SystemStub - private EnvironmentVariables environmentVariables; - - private static final String DB_NAME = MODELS_SCHEMA; - private MySQLContainer container; - private Database database; - private MySqlSource source; - private JsonNode config; - - @BeforeEach - public void setup() throws SQLException { - environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); - init(); - revokeAllPermissions(); - grantCorrectPermissions(); - super.setup(); - } - - private void init() { - container = new MySQLContainer<>("mysql:8.0"); - container.start(); - source = new MySqlSource(); - database = new Database(DSLContextFactory.create( - "root", - "test", - DRIVER_CLASS, - String.format("jdbc:mysql://%s:%s", - container.getHost(), - container.getFirstMappedPort()), - SQLDialect.MYSQL)); - - final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() - .put("method", "CDC") - .put("initial_waiting_seconds", INITIAL_WAITING_SECONDS) - .put("time_zone", "America/Los_Angeles") - .build()); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put("host", container.getHost()) - .put("port", container.getFirstMappedPort()) - .put("database", DB_NAME) - .put("username", container.getUsername()) - .put("password", container.getPassword()) - .put("replication_method", replicationMethod) - .put("is_test", true) - .build()); - } - - private void revokeAllPermissions() { - executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); - } - - private void revokeReplicationClientPermission() { - executeQuery("REVOKE REPLICATION CLIENT ON *.* FROM " + container.getUsername() + "@'%';"); - } - - private void grantCorrectPermissions() { - executeQuery("GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + container.getUsername() + "@'%';"); - } - - private void purgeAllBinaryLogs() { - executeQuery("RESET MASTER;"); - } - - @AfterEach - public void tearDown() { - try { - container.close(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - protected MySqlCdcTargetPosition cdcLatestTargetPosition() { - final DataSource dataSource = DataSourceFactory.create( - "root", - "test", - DRIVER_CLASS, - String.format("jdbc:mysql://%s:%s", - container.getHost(), - container.getFirstMappedPort()), - Collections.emptyMap()); - final JdbcDatabase jdbcDatabase = new DefaultJdbcDatabase(dataSource); - - return MySqlCdcTargetPosition.targetPosition(jdbcDatabase); - } - - @Override - protected MySqlCdcTargetPosition extractPosition(final JsonNode record) { - return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asLong()); - } - - @Override - protected void assertNullCdcMetaData(final JsonNode data) { - assertNull(data.get(CDC_LOG_FILE)); - assertNull(data.get(CDC_LOG_POS)); - assertNull(data.get(CDC_UPDATED_AT)); - assertNull(data.get(CDC_DELETED_AT)); - } - - @Override - protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) { - assertNotNull(data.get(CDC_LOG_FILE)); - assertNotNull(data.get(CDC_LOG_POS)); - assertNotNull(data.get(CDC_UPDATED_AT)); - if (deletedAtNull) { - assertTrue(data.get(CDC_DELETED_AT).isNull()); - } else { - assertFalse(data.get(CDC_DELETED_AT).isNull()); - } - } - - @Override - protected void removeCDCColumns(final ObjectNode data) { - data.remove(CDC_LOG_FILE); - data.remove(CDC_LOG_POS); - data.remove(CDC_UPDATED_AT); - data.remove(CDC_DELETED_AT); - } - - @Override - protected void addCdcMetadataColumns(final AirbyteStream stream) { - final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); - final ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); - - final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); - - final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); - properties.set(CDC_LOG_FILE, stringType); - properties.set(CDC_LOG_POS, numberType); - properties.set(CDC_UPDATED_AT, stringType); - properties.set(CDC_DELETED_AT, stringType); - } - - @Override - protected Source getSource() { - return source; - } - - @Override - protected JsonNode getConfig() { - return config; - } - - @Override - protected Database getDatabase() { - return database; - } - - @Override - public void assertExpectedStateMessages(final List stateMessages) { - for (final AirbyteStateMessage stateMessage : stateMessages) { - assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET)); - assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY)); - } - } - - @Override - protected String randomTableSchema() { - return MODELS_SCHEMA; - } - - @Test - protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception { - revokeReplicationClientPermission(); - final AirbyteConnectionStatus status = getSource().check(getConfig()); - final String expectedErrorMessage = "Please grant REPLICATION CLIENT privilege, so that binary log files are available" - + " for CDC mode."; - assertTrue(status.getStatus().equals(Status.FAILED)); - assertTrue(status.getMessage().contains(expectedErrorMessage)); - } - - @Test - protected void syncShouldHandlePurgedLogsGracefully() throws Exception { - - final int recordsToCreate = 20; - // first batch of records. 20 created here and 6 created in setup method. - for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated)); - writeModelRecord(record); - } - - final AutoCloseableIterator firstBatchIterator = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List dataFromFirstBatch = AutoCloseableIterators - .toListAndClose(firstBatchIterator); - final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); - assertEquals(1, stateAfterFirstBatch.size()); - assertNotNull(stateAfterFirstBatch.get(0).getData()); - assertExpectedStateMessages(stateAfterFirstBatch); - final Set recordsFromFirstBatch = extractRecordMessages( - dataFromFirstBatch); - - final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); - assertEquals((recordsCreatedBeforeTestCount + recordsToCreate), recordsFromFirstBatch.size()); - // sometimes there can be more than one of these at the end of the snapshot and just before the - // first incremental. - final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( - recordsFromFirstBatch); - - assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), - "Expected first sync to include records created while the test was running."); - - // second batch of records again 20 being created - for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated)); - writeModelRecord(record); - } - - purgeAllBinaryLogs(); - - final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch); - final AutoCloseableIterator secondBatchIterator = getSource() - .read(getConfig(), CONFIGURED_CATALOG, state); - final List dataFromSecondBatch = AutoCloseableIterators - .toListAndClose(secondBatchIterator); - - final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); - assertEquals(1, stateAfterSecondBatch.size()); - assertNotNull(stateAfterSecondBatch.get(0).getData()); - assertExpectedStateMessages(stateAfterSecondBatch); - - final Set recordsFromSecondBatch = extractRecordMessages( - dataFromSecondBatch); - assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, recordsFromSecondBatch.size(), - "Expected 46 records to be replicated in the second sync."); - } - -} +// /* +// * Copyright (c) 2023 Airbyte, Inc., all rights reserved. +// */ + +// package io.airbyte.integrations.source.mysql; + +// import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +// import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +// import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE; +// import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS; +// import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS; +// import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; +// import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; +// import static org.junit.jupiter.api.Assertions.assertEquals; +// import static org.junit.jupiter.api.Assertions.assertFalse; +// import static org.junit.jupiter.api.Assertions.assertNotNull; +// import static org.junit.jupiter.api.Assertions.assertNull; +// import static org.junit.jupiter.api.Assertions.assertTrue; + +// import com.fasterxml.jackson.databind.JsonNode; +// import com.fasterxml.jackson.databind.node.ObjectNode; +// import com.google.common.collect.ImmutableMap; +// import io.airbyte.commons.features.EnvVariableFeatureFlags; +// import io.airbyte.commons.json.Jsons; +// import io.airbyte.commons.util.AutoCloseableIterator; +// import io.airbyte.commons.util.AutoCloseableIterators; +// import io.airbyte.db.Database; +// import io.airbyte.db.factory.DSLContextFactory; +// import io.airbyte.db.factory.DataSourceFactory; +// import io.airbyte.db.jdbc.DefaultJdbcDatabase; +// import io.airbyte.db.jdbc.JdbcDatabase; +// import io.airbyte.integrations.base.Source; +// import io.airbyte.integrations.debezium.CdcSourceTest; +// import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition; +// import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +// import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; +// import io.airbyte.protocol.models.v0.AirbyteMessage; +// import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +// import io.airbyte.protocol.models.v0.AirbyteStateMessage; +// import io.airbyte.protocol.models.v0.AirbyteStream; +// import java.sql.SQLException; +// import java.util.Collections; +// import java.util.List; +// import java.util.Set; +// import javax.sql.DataSource; +// import org.jooq.SQLDialect; +// import org.junit.jupiter.api.AfterEach; +// import org.junit.jupiter.api.BeforeEach; +// import org.junit.jupiter.api.Test; +// import org.junit.jupiter.api.extension.ExtendWith; +// import org.testcontainers.containers.MySQLContainer; +// import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +// import uk.org.webcompere.systemstubs.jupiter.SystemStub; +// import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; + +// @ExtendWith(SystemStubsExtension.class) +// public class CdcMysqlSourceTest extends CdcSourceTest { + +// @SystemStub +// private EnvironmentVariables environmentVariables; + +// private static final String DB_NAME = MODELS_SCHEMA; +// private MySQLContainer container; +// private Database database; +// private MySqlSource source; +// private JsonNode config; + +// @BeforeEach +// public void setup() throws SQLException { +// environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); +// init(); +// revokeAllPermissions(); +// grantCorrectPermissions(); +// super.setup(); +// } + +// private void init() { +// container = new MySQLContainer<>("mysql:8.0"); +// container.start(); +// source = new MySqlSource(); +// database = new Database(DSLContextFactory.create( +// "root", +// "test", +// DRIVER_CLASS, +// String.format("jdbc:mysql://%s:%s", +// container.getHost(), +// container.getFirstMappedPort()), +// SQLDialect.MYSQL)); + +// final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() +// .put("method", "CDC") +// .put("initial_waiting_seconds", INITIAL_WAITING_SECONDS) +// .put("time_zone", "America/Los_Angeles") +// .build()); + +// config = Jsons.jsonNode(ImmutableMap.builder() +// .put("host", container.getHost()) +// .put("port", container.getFirstMappedPort()) +// .put("database", DB_NAME) +// .put("username", container.getUsername()) +// .put("password", container.getPassword()) +// .put("replication_method", replicationMethod) +// .put("is_test", true) +// .build()); +// } + +// private void revokeAllPermissions() { +// executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); +// } + +// private void revokeReplicationClientPermission() { +// executeQuery("REVOKE REPLICATION CLIENT ON *.* FROM " + container.getUsername() + "@'%';"); +// } + +// private void grantCorrectPermissions() { +// executeQuery("GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + container.getUsername() + "@'%';"); +// } + +// private void purgeAllBinaryLogs() { +// executeQuery("RESET MASTER;"); +// } + +// @AfterEach +// public void tearDown() { +// try { +// container.close(); +// } catch (final Exception e) { +// throw new RuntimeException(e); +// } +// } + +// @Override +// protected MySqlCdcTargetPosition cdcLatestTargetPosition() { +// final DataSource dataSource = DataSourceFactory.create( +// "root", +// "test", +// DRIVER_CLASS, +// String.format("jdbc:mysql://%s:%s", +// container.getHost(), +// container.getFirstMappedPort()), +// Collections.emptyMap()); +// final JdbcDatabase jdbcDatabase = new DefaultJdbcDatabase(dataSource); + +// return MySqlCdcTargetPosition.targetPosition(jdbcDatabase); +// } + +// @Override +// protected MySqlCdcTargetPosition extractPosition(final JsonNode record) { +// return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asLong()); +// } + +// @Override +// protected void assertNullCdcMetaData(final JsonNode data) { +// assertNull(data.get(CDC_LOG_FILE)); +// assertNull(data.get(CDC_LOG_POS)); +// assertNull(data.get(CDC_UPDATED_AT)); +// assertNull(data.get(CDC_DELETED_AT)); +// } + +// @Override +// protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) { +// assertNotNull(data.get(CDC_LOG_FILE)); +// assertNotNull(data.get(CDC_LOG_POS)); +// assertNotNull(data.get(CDC_UPDATED_AT)); +// if (deletedAtNull) { +// assertTrue(data.get(CDC_DELETED_AT).isNull()); +// } else { +// assertFalse(data.get(CDC_DELETED_AT).isNull()); +// } +// } + +// @Override +// protected void removeCDCColumns(final ObjectNode data) { +// data.remove(CDC_LOG_FILE); +// data.remove(CDC_LOG_POS); +// data.remove(CDC_UPDATED_AT); +// data.remove(CDC_DELETED_AT); +// } + +// @Override +// protected void addCdcMetadataColumns(final AirbyteStream stream) { +// final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); +// final ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + +// final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); + +// final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); +// properties.set(CDC_LOG_FILE, stringType); +// properties.set(CDC_LOG_POS, numberType); +// properties.set(CDC_UPDATED_AT, stringType); +// properties.set(CDC_DELETED_AT, stringType); +// } + +// @Override +// protected Source getSource() { +// return source; +// } + +// @Override +// protected JsonNode getConfig() { +// return config; +// } + +// @Override +// protected Database getDatabase() { +// return database; +// } + +// @Override +// public void assertExpectedStateMessages(final List stateMessages) { +// for (final AirbyteStateMessage stateMessage : stateMessages) { +// assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET)); +// assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY)); +// } +// } + +// @Override +// protected String randomTableSchema() { +// return MODELS_SCHEMA; +// } + +// @Test +// protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception { +// revokeReplicationClientPermission(); +// final AirbyteConnectionStatus status = getSource().check(getConfig()); +// final String expectedErrorMessage = "Please grant REPLICATION CLIENT privilege, so that binary log files are available" +// + " for CDC mode."; +// assertTrue(status.getStatus().equals(Status.FAILED)); +// assertTrue(status.getMessage().contains(expectedErrorMessage)); +// } + +// @Test +// protected void syncShouldHandlePurgedLogsGracefully() throws Exception { + +// final int recordsToCreate = 20; +// // first batch of records. 20 created here and 6 created in setup method. +// for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { +// final JsonNode record = +// Jsons.jsonNode(ImmutableMap +// .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, +// "F-" + recordsCreated)); +// writeModelRecord(record); +// } + +// final AutoCloseableIterator firstBatchIterator = getSource() +// .read(getConfig(), CONFIGURED_CATALOG, null); +// final List dataFromFirstBatch = AutoCloseableIterators +// .toListAndClose(firstBatchIterator); +// final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); +// assertEquals(1, stateAfterFirstBatch.size()); +// assertNotNull(stateAfterFirstBatch.get(0).getData()); +// assertExpectedStateMessages(stateAfterFirstBatch); +// final Set recordsFromFirstBatch = extractRecordMessages( +// dataFromFirstBatch); + +// final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); +// assertEquals((recordsCreatedBeforeTestCount + recordsToCreate), recordsFromFirstBatch.size()); +// // sometimes there can be more than one of these at the end of the snapshot and just before the +// // first incremental. +// final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( +// recordsFromFirstBatch); + +// assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), +// "Expected first sync to include records created while the test was running."); + +// // second batch of records again 20 being created +// for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { +// final JsonNode record = +// Jsons.jsonNode(ImmutableMap +// .of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, +// "F-" + recordsCreated)); +// writeModelRecord(record); +// } + +// purgeAllBinaryLogs(); + +// final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch); +// final AutoCloseableIterator secondBatchIterator = getSource() +// .read(getConfig(), CONFIGURED_CATALOG, state); +// final List dataFromSecondBatch = AutoCloseableIterators +// .toListAndClose(secondBatchIterator); + +// final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); +// assertEquals(1, stateAfterSecondBatch.size()); +// assertNotNull(stateAfterSecondBatch.get(0).getData()); +// assertExpectedStateMessages(stateAfterSecondBatch); + +// final Set recordsFromSecondBatch = extractRecordMessages( +// dataFromSecondBatch); +// assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, recordsFromSecondBatch.size(), +// "Expected 46 records to be replicated in the second sync."); +// } + +// } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index 8d61becf068c0..516451cb14fd7 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -1,306 +1,306 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import com.mysql.cj.MysqlType; -import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.string.Strings; -import io.airbyte.db.Database; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DatabaseDriver; -import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; -import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; -import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.AirbyteCatalog; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import io.airbyte.protocol.models.v0.SyncMode; -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import org.jooq.DSLContext; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.containers.MySQLContainer; -import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; -import uk.org.webcompere.systemstubs.jupiter.SystemStub; -import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; - -@ExtendWith(SystemStubsExtension.class) -class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { - - @SystemStub - private EnvironmentVariables environmentVariables; - - protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; - protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password"; - protected static final String TEST_USER = "test"; - protected static final Callable TEST_PASSWORD = () -> "test"; - protected static MySQLContainer container; - - protected Database database; - protected DSLContext dslContext; - - @BeforeAll - static void init() throws Exception { - container = new MySQLContainer<>("mysql:8.0") - .withUsername(TEST_USER) - .withPassword(TEST_PASSWORD.call()) - .withEnv("MYSQL_ROOT_HOST", "%") - .withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD.call()); - container.start(); - final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call()); - connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n"); - } - - @BeforeEach - public void setup() throws Exception { - environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); - config = Jsons.jsonNode(ImmutableMap.builder() - .put(JdbcUtils.HOST_KEY, container.getHost()) - .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) - .put(JdbcUtils.DATABASE_KEY, Strings.addRandomSuffix("db", "_", 10)) - .put(JdbcUtils.USERNAME_KEY, TEST_USER) - .put(JdbcUtils.PASSWORD_KEY, TEST_PASSWORD.call()) - .build()); - - dslContext = DSLContextFactory.create( - config.get(JdbcUtils.USERNAME_KEY).asText(), - config.get(JdbcUtils.PASSWORD_KEY).asText(), - DatabaseDriver.MYSQL.getDriverClassName(), - String.format("jdbc:mysql://%s:%s", - config.get(JdbcUtils.HOST_KEY).asText(), - config.get(JdbcUtils.PORT_KEY).asText()), - SQLDialect.MYSQL); - database = new Database(dslContext); - - database.query(ctx -> { - ctx.fetch("CREATE DATABASE " + config.get(JdbcUtils.DATABASE_KEY).asText()); - return null; - }); - - super.setup(); - } - - @AfterEach - void tearDownMySql() throws Exception { - dslContext.close(); - super.tearDown(); - } - - @AfterAll - static void cleanUp() { - container.close(); - } - - // MySql does not support schemas in the way most dbs do. Instead we namespace by db name. - @Override - public boolean supportsSchemas() { - return false; - } - - @Override - public AbstractJdbcSource getJdbcSource() { - return new MySqlSource(); - } - - @Override - public String getDriverClass() { - return MySqlSource.DRIVER_CLASS; - } - - @Override - public JsonNode getConfig() { - return Jsons.clone(config); - } - - @Test - void testSpec() throws Exception { - final ConnectorSpecification actual = source.spec(); - final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); - - assertEquals(expected, actual); - } - - /** - * MySQL Error Codes: - *

- * https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html - *

- * - * @throws Exception - */ - @Test - void testCheckIncorrectPasswordFailure() throws Exception { - ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 08001;")); - } - - @Test - public void testCheckIncorrectUsernameFailure() throws Exception { - ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake"); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - // do not test for message since there seems to be flakiness where sometimes the test will get the - // message with - // State code: 08001 or State code: 28000 - } - - @Test - public void testCheckIncorrectHostFailure() throws Exception { - ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 08S01;")); - } - - @Test - public void testCheckIncorrectPortFailure() throws Exception { - ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "0000"); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 08S01;")); - } - - @Test - public void testCheckIncorrectDataBaseFailure() throws Exception { - ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 42000; Error code: 1049;")); - } - - @Test - public void testUserHasNoPermissionToDataBase() throws Exception { - final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call()); - connection.createStatement() - .execute("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n"); - ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); - ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 08001;")); - } - - @Override - protected AirbyteCatalog getCatalog(final String defaultNamespace) { - return new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - TABLE_NAME, - defaultNamespace, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), - CatalogHelpers.createAirbyteStream( - TABLE_NAME_WITHOUT_PK, - defaultNamespace, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(Collections.emptyList()), - CatalogHelpers.createAirbyteStream( - TABLE_NAME_COMPOSITE_PK, - defaultNamespace, - Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), - Field.of(COL_LAST_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey( - List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); - } - - @Override - protected void incrementalDateCheck() throws Exception { - incrementalCursorCheck( - COL_UPDATED_AT, - "2005-10-18", - "2006-10-19", - List.of(getTestMessages().get(1), getTestMessages().get(2))); - } - - @Override - protected List getTestMessages() { - return List.of( - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_1, - COL_NAME, "picard", - COL_UPDATED_AT, "2004-10-19")))), - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_2, - COL_NAME, "crusher", - COL_UPDATED_AT, - "2005-10-19")))), - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_3, - COL_NAME, "vash", - COL_UPDATED_AT, "2006-10-19"))))); - } - - @Override - protected List getExpectedAirbyteMessagesSecondSync(final String namespace) { - final List expectedMessages = new ArrayList<>(); - expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_4, - COL_NAME, "riker", - COL_UPDATED_AT, "2006-10-19"))))); - expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_5, - COL_NAME, "data", - COL_UPDATED_AT, "2006-10-19"))))); - final DbStreamState state = new DbStreamState() - .withStreamName(streamName) - .withStreamNamespace(namespace) - .withCursorField(List.of(COL_ID)) - .withCursor("5") - .withCursorRecordCount(1L); - expectedMessages.addAll(createExpectedTestMessages(List.of(state))); - return expectedMessages; - } - - @Override - protected boolean supportsPerStream() { - return true; - } - -} +// /* +// * Copyright (c) 2023 Airbyte, Inc., all rights reserved. +// */ + +// package io.airbyte.integrations.source.mysql; + +// import static org.junit.jupiter.api.Assertions.assertEquals; +// import static org.junit.jupiter.api.Assertions.assertTrue; + +// import com.fasterxml.jackson.databind.JsonNode; +// import com.fasterxml.jackson.databind.node.ObjectNode; +// import com.google.common.collect.ImmutableMap; +// import com.mysql.cj.MysqlType; +// import io.airbyte.commons.features.EnvVariableFeatureFlags; +// import io.airbyte.commons.json.Jsons; +// import io.airbyte.commons.resources.MoreResources; +// import io.airbyte.commons.string.Strings; +// import io.airbyte.db.Database; +// import io.airbyte.db.factory.DSLContextFactory; +// import io.airbyte.db.factory.DatabaseDriver; +// import io.airbyte.db.jdbc.JdbcUtils; +// import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +// import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +// import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +// import io.airbyte.protocol.models.Field; +// import io.airbyte.protocol.models.JsonSchemaType; +// import io.airbyte.protocol.models.v0.AirbyteCatalog; +// import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +// import io.airbyte.protocol.models.v0.AirbyteMessage; +// import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +// import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +// import io.airbyte.protocol.models.v0.CatalogHelpers; +// import io.airbyte.protocol.models.v0.ConnectorSpecification; +// import io.airbyte.protocol.models.v0.SyncMode; +// import java.sql.Connection; +// import java.sql.DriverManager; +// import java.util.ArrayList; +// import java.util.Collections; +// import java.util.List; +// import java.util.Map; +// import java.util.concurrent.Callable; +// import org.jooq.DSLContext; +// import org.jooq.SQLDialect; +// import org.junit.jupiter.api.AfterAll; +// import org.junit.jupiter.api.AfterEach; +// import org.junit.jupiter.api.BeforeAll; +// import org.junit.jupiter.api.BeforeEach; +// import org.junit.jupiter.api.Test; +// import org.junit.jupiter.api.extension.ExtendWith; +// import org.testcontainers.containers.MySQLContainer; +// import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +// import uk.org.webcompere.systemstubs.jupiter.SystemStub; +// import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; + +// @ExtendWith(SystemStubsExtension.class) +// class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + +// @SystemStub +// private EnvironmentVariables environmentVariables; + +// protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; +// protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password"; +// protected static final String TEST_USER = "test"; +// protected static final Callable TEST_PASSWORD = () -> "test"; +// protected static MySQLContainer container; + +// protected Database database; +// protected DSLContext dslContext; + +// @BeforeAll +// static void init() throws Exception { +// container = new MySQLContainer<>("mysql:8.0") +// .withUsername(TEST_USER) +// .withPassword(TEST_PASSWORD.call()) +// .withEnv("MYSQL_ROOT_HOST", "%") +// .withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD.call()); +// container.start(); +// final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call()); +// connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n"); +// } + +// @BeforeEach +// public void setup() throws Exception { +// environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); +// config = Jsons.jsonNode(ImmutableMap.builder() +// .put(JdbcUtils.HOST_KEY, container.getHost()) +// .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) +// .put(JdbcUtils.DATABASE_KEY, Strings.addRandomSuffix("db", "_", 10)) +// .put(JdbcUtils.USERNAME_KEY, TEST_USER) +// .put(JdbcUtils.PASSWORD_KEY, TEST_PASSWORD.call()) +// .build()); + +// dslContext = DSLContextFactory.create( +// config.get(JdbcUtils.USERNAME_KEY).asText(), +// config.get(JdbcUtils.PASSWORD_KEY).asText(), +// DatabaseDriver.MYSQL.getDriverClassName(), +// String.format("jdbc:mysql://%s:%s", +// config.get(JdbcUtils.HOST_KEY).asText(), +// config.get(JdbcUtils.PORT_KEY).asText()), +// SQLDialect.MYSQL); +// database = new Database(dslContext); + +// database.query(ctx -> { +// ctx.fetch("CREATE DATABASE " + config.get(JdbcUtils.DATABASE_KEY).asText()); +// return null; +// }); + +// super.setup(); +// } + +// @AfterEach +// void tearDownMySql() throws Exception { +// dslContext.close(); +// super.tearDown(); +// } + +// @AfterAll +// static void cleanUp() { +// container.close(); +// } + +// // MySql does not support schemas in the way most dbs do. Instead we namespace by db name. +// @Override +// public boolean supportsSchemas() { +// return false; +// } + +// @Override +// public AbstractJdbcSource getJdbcSource() { +// return new MySqlSource(); +// } + +// @Override +// public String getDriverClass() { +// return MySqlSource.DRIVER_CLASS; +// } + +// @Override +// public JsonNode getConfig() { +// return Jsons.clone(config); +// } + +// @Test +// void testSpec() throws Exception { +// final ConnectorSpecification actual = source.spec(); +// final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + +// assertEquals(expected, actual); +// } + +// /** +// * MySQL Error Codes: +// *

+// * https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html +// *

+// * +// * @throws Exception +// */ +// @Test +// void testCheckIncorrectPasswordFailure() throws Exception { +// ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); +// final AirbyteConnectionStatus status = source.check(config); +// assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); +// assertTrue(status.getMessage().contains("State code: 08001;")); +// } + +// @Test +// public void testCheckIncorrectUsernameFailure() throws Exception { +// ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake"); +// final AirbyteConnectionStatus status = source.check(config); +// assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); +// // do not test for message since there seems to be flakiness where sometimes the test will get the +// // message with +// // State code: 08001 or State code: 28000 +// } + +// @Test +// public void testCheckIncorrectHostFailure() throws Exception { +// ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); +// final AirbyteConnectionStatus status = source.check(config); +// assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); +// assertTrue(status.getMessage().contains("State code: 08S01;")); +// } + +// @Test +// public void testCheckIncorrectPortFailure() throws Exception { +// ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "0000"); +// final AirbyteConnectionStatus status = source.check(config); +// assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); +// assertTrue(status.getMessage().contains("State code: 08S01;")); +// } + +// @Test +// public void testCheckIncorrectDataBaseFailure() throws Exception { +// ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); +// final AirbyteConnectionStatus status = source.check(config); +// assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); +// assertTrue(status.getMessage().contains("State code: 42000; Error code: 1049;")); +// } + +// @Test +// public void testUserHasNoPermissionToDataBase() throws Exception { +// final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call()); +// connection.createStatement() +// .execute("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n"); +// ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); +// ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION); +// final AirbyteConnectionStatus status = source.check(config); +// assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); +// assertTrue(status.getMessage().contains("State code: 08001;")); +// } + +// @Override +// protected AirbyteCatalog getCatalog(final String defaultNamespace) { +// return new AirbyteCatalog().withStreams(List.of( +// CatalogHelpers.createAirbyteStream( +// TABLE_NAME, +// defaultNamespace, +// Field.of(COL_ID, JsonSchemaType.INTEGER), +// Field.of(COL_NAME, JsonSchemaType.STRING), +// Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) +// .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) +// .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), +// CatalogHelpers.createAirbyteStream( +// TABLE_NAME_WITHOUT_PK, +// defaultNamespace, +// Field.of(COL_ID, JsonSchemaType.INTEGER), +// Field.of(COL_NAME, JsonSchemaType.STRING), +// Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) +// .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) +// .withSourceDefinedPrimaryKey(Collections.emptyList()), +// CatalogHelpers.createAirbyteStream( +// TABLE_NAME_COMPOSITE_PK, +// defaultNamespace, +// Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), +// Field.of(COL_LAST_NAME, JsonSchemaType.STRING), +// Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) +// .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) +// .withSourceDefinedPrimaryKey( +// List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); +// } + +// @Override +// protected void incrementalDateCheck() throws Exception { +// incrementalCursorCheck( +// COL_UPDATED_AT, +// "2005-10-18", +// "2006-10-19", +// List.of(getTestMessages().get(1), getTestMessages().get(2))); +// } + +// @Override +// protected List getTestMessages() { +// return List.of( +// new AirbyteMessage().withType(Type.RECORD) +// .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) +// .withData(Jsons.jsonNode(Map +// .of(COL_ID, ID_VALUE_1, +// COL_NAME, "picard", +// COL_UPDATED_AT, "2004-10-19")))), +// new AirbyteMessage().withType(Type.RECORD) +// .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) +// .withData(Jsons.jsonNode(Map +// .of(COL_ID, ID_VALUE_2, +// COL_NAME, "crusher", +// COL_UPDATED_AT, +// "2005-10-19")))), +// new AirbyteMessage().withType(Type.RECORD) +// .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) +// .withData(Jsons.jsonNode(Map +// .of(COL_ID, ID_VALUE_3, +// COL_NAME, "vash", +// COL_UPDATED_AT, "2006-10-19"))))); +// } + +// @Override +// protected List getExpectedAirbyteMessagesSecondSync(final String namespace) { +// final List expectedMessages = new ArrayList<>(); +// expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) +// .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) +// .withData(Jsons.jsonNode(Map +// .of(COL_ID, ID_VALUE_4, +// COL_NAME, "riker", +// COL_UPDATED_AT, "2006-10-19"))))); +// expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) +// .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) +// .withData(Jsons.jsonNode(Map +// .of(COL_ID, ID_VALUE_5, +// COL_NAME, "data", +// COL_UPDATED_AT, "2006-10-19"))))); +// final DbStreamState state = new DbStreamState() +// .withStreamName(streamName) +// .withStreamNamespace(namespace) +// .withCursorField(List.of(COL_ID)) +// .withCursor("5") +// .withCursorRecordCount(1L); +// expectedMessages.addAll(createExpectedTestMessages(List.of(state))); +// return expectedMessages; +// } + +// @Override +// protected boolean supportsPerStream() { +// return true; +// } + +// } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java index 29d1021f9161a..e2b7eca9d7904 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java @@ -47,7 +47,6 @@ public class MySqlSourceTests { private static final String TEST_USER = "test"; private static final String TEST_PASSWORD = "test"; - @Test public void testSettingTimezones() throws Exception { // start DB try (final MySQLContainer container = new MySQLContainer<>("mysql:8.0") @@ -204,7 +203,6 @@ void testParseJdbcParameters() { assertEquals("bar", parameters.get("foo")); } - @Test public void testJDBCSessionVariable() throws Exception { // start DB try (final MySQLContainer container = new MySQLContainer<>("mysql:8.0") diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSslJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSslJdbcSourceAcceptanceTest.java index cb6b1ead8267e..c7d2326f2d8e4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSslJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSslJdbcSourceAcceptanceTest.java @@ -1,52 +1,52 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql; - -import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS; - -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.db.Database; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DatabaseDriver; -import io.airbyte.db.jdbc.JdbcUtils; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.BeforeEach; - -class MySqlSslJdbcSourceAcceptanceTest extends MySqlJdbcSourceAcceptanceTest { - - @BeforeEach - public void setup() throws Exception { - config = Jsons.jsonNode(ImmutableMap.builder() - .put(JdbcUtils.HOST_KEY, container.getHost()) - .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) - .put(JdbcUtils.DATABASE_KEY, Strings.addRandomSuffix("db", "_", 10)) - .put(JdbcUtils.USERNAME_KEY, TEST_USER) - .put(JdbcUtils.PASSWORD_KEY, TEST_PASSWORD.call()) - .put(JdbcUtils.SSL_KEY, true) - .build()); - - dslContext = DSLContextFactory.create( - config.get(JdbcUtils.USERNAME_KEY).asText(), - config.get(JdbcUtils.PASSWORD_KEY).asText(), - DatabaseDriver.MYSQL.getDriverClassName(), - String.format("jdbc:mysql://%s:%s?%s", - config.get(JdbcUtils.HOST_KEY).asText(), - config.get(JdbcUtils.PORT_KEY).asText(), - String.join("&", SSL_PARAMETERS)), - SQLDialect.MYSQL); - database = new Database(dslContext); - - database.query(ctx -> { - ctx.fetch("CREATE DATABASE " + config.get(JdbcUtils.DATABASE_KEY).asText()); - ctx.fetch("SHOW STATUS LIKE 'Ssl_cipher'"); - return null; - }); - - super.setup(); - } - -} +// /* +// * Copyright (c) 2023 Airbyte, Inc., all rights reserved. +// */ + +// package io.airbyte.integrations.source.mysql; + +// import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS; + +// import com.google.common.collect.ImmutableMap; +// import io.airbyte.commons.json.Jsons; +// import io.airbyte.commons.string.Strings; +// import io.airbyte.db.Database; +// import io.airbyte.db.factory.DSLContextFactory; +// import io.airbyte.db.factory.DatabaseDriver; +// import io.airbyte.db.jdbc.JdbcUtils; +// import org.jooq.SQLDialect; +// import org.junit.jupiter.api.BeforeEach; + +// class MySqlSslJdbcSourceAcceptanceTest extends MySqlJdbcSourceAcceptanceTest { + +// @BeforeEach +// public void setup() throws Exception { +// config = Jsons.jsonNode(ImmutableMap.builder() +// .put(JdbcUtils.HOST_KEY, container.getHost()) +// .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) +// .put(JdbcUtils.DATABASE_KEY, Strings.addRandomSuffix("db", "_", 10)) +// .put(JdbcUtils.USERNAME_KEY, TEST_USER) +// .put(JdbcUtils.PASSWORD_KEY, TEST_PASSWORD.call()) +// .put(JdbcUtils.SSL_KEY, true) +// .build()); + +// dslContext = DSLContextFactory.create( +// config.get(JdbcUtils.USERNAME_KEY).asText(), +// config.get(JdbcUtils.PASSWORD_KEY).asText(), +// DatabaseDriver.MYSQL.getDriverClassName(), +// String.format("jdbc:mysql://%s:%s?%s", +// config.get(JdbcUtils.HOST_KEY).asText(), +// config.get(JdbcUtils.PORT_KEY).asText(), +// String.join("&", SSL_PARAMETERS)), +// SQLDialect.MYSQL); +// database = new Database(dslContext); + +// database.query(ctx -> { +// ctx.fetch("CREATE DATABASE " + config.get(JdbcUtils.DATABASE_KEY).asText()); +// ctx.fetch("SHOW STATUS LIKE 'Ssl_cipher'"); +// return null; +// }); + +// super.setup(); +// } + +// }