diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index d74c4dc89f..5dad3cb3c6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -24,7 +24,9 @@ import org.opensearch.dataprepper.plugins.source.rds.leader.RdsApiStrategy; import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler; import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; +import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory; import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler; @@ -64,6 +66,7 @@ public class RdsService { private ExportScheduler exportScheduler; private DataFileScheduler dataFileScheduler; private StreamScheduler streamScheduler; + private ResyncScheduler resyncScheduler; public RdsService(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, @@ -129,6 +132,10 @@ public void start(Buffer> buffer) { streamScheduler = new StreamScheduler( sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable); runnableList.add(streamScheduler); + + resyncScheduler = new ResyncScheduler( + sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager); + runnableList.add(resyncScheduler); } executor = Executors.newFixedThreadPool(runnableList.size()); @@ -158,7 +165,7 @@ public void shutdown() { private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) { final ConnectionManager connectionManager = new ConnectionManager( - dbMetadata.getHostName(), + dbMetadata.getEndpoint(), dbMetadata.getPort(), sourceConfig.getAuthenticationConfig().getUsername(), sourceConfig.getAuthenticationConfig().getPassword(), @@ -166,6 +173,18 @@ private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final return new SchemaManager(connectionManager); } + private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) { + final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint(); + final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort(); + final ConnectionManager readerConnectionManager = new ConnectionManager( + readerEndpoint, + readerPort, + sourceConfig.getAuthenticationConfig().getUsername(), + sourceConfig.getAuthenticationConfig().getPassword(), + sourceConfig.isTlsEnabled()); + return new QueryManager(readerConnectionManager); + } + private String getS3PathPrefix() { final String s3UserPathPrefix; if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java index 5e4321119d..7fee5ba4be 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; +import lombok.Getter; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; @@ -18,21 +19,23 @@ public class ResyncPartition extends EnhancedSourcePartition getProgressState() { } return Optional.empty(); } + + public PartitionKeyInfo getPartitionKeyInfo() { + return partitionKeyInfo; + } + + @Getter + public static class PartitionKeyInfo { + private final String database; + private final String table; + private final long timestamp; + + private PartitionKeyInfo(String database, String table, long timestamp) { + this.database = database; + this.table = table; + this.timestamp = timestamp; + } + + private static PartitionKeyInfo fromString(String partitionKey) { + String[] keySplits = partitionKey.split("\\|"); + if (keySplits.length != 3) { + throw new IllegalArgumentException("Invalid partition key: " + partitionKey); + } + return new PartitionKeyInfo(keySplits[0], keySplits[1], Long.parseLong(keySplits[2])); + } + + @Override + public String toString() { + return database + "|" + table + "|" + timestamp; + } + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategy.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategy.java index 9a74c643f8..ddcc976a6f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategy.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategy.java @@ -39,7 +39,13 @@ public DbMetadata describeDb(String dbIdentifier) { try { final DescribeDbClustersResponse response = rdsClient.describeDBClusters(request); final DBCluster dbCluster = response.dbClusters().get(0); - return new DbMetadata(dbIdentifier, dbCluster.endpoint(), dbCluster.port()); + return DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(dbCluster.endpoint()) + .port(dbCluster.port()) + .readerEndpoint(dbCluster.readerEndpoint()) + .readerPort(dbCluster.port()) + .build(); } catch (Exception e) { throw new RuntimeException("Failed to describe DB " + dbIdentifier, e); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategy.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategy.java index 1ccb467454..322294d0e7 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategy.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategy.java @@ -33,14 +33,28 @@ public InstanceApiStrategy(final RdsClient rdsClient) { @Override public DbMetadata describeDb(String dbIdentifier) { - final DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder() - .dbInstanceIdentifier(dbIdentifier) - .build(); - try { + final DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder() + .dbInstanceIdentifier(dbIdentifier) + .build(); + final DescribeDbInstancesResponse response = rdsClient.describeDBInstances(request); final DBInstance dbInstance = response.dbInstances().get(0); - return new DbMetadata(dbIdentifier, dbInstance.endpoint().address(), dbInstance.endpoint().port()); + DbMetadata.DbMetadataBuilder dbMetadataBuilder = DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(dbInstance.endpoint().address()) + .port(dbInstance.endpoint().port()); + + if (dbInstance.hasReadReplicaDBInstanceIdentifiers()) { + final DescribeDbInstancesRequest readerInstanceRequest = DescribeDbInstancesRequest.builder() + .dbInstanceIdentifier(dbInstance.readReplicaDBInstanceIdentifiers().get(0)) + .build(); + final DescribeDbInstancesResponse readerInstanceResponse = rdsClient.describeDBInstances(readerInstanceRequest); + final DBInstance readerInstance = readerInstanceResponse.dbInstances().get(0); + dbMetadataBuilder.readerEndpoint(readerInstance.endpoint().address()) + .readerPort(readerInstance.endpoint().port()); + } + return dbMetadataBuilder.build(); } catch (Exception e) { throw new RuntimeException("Failed to describe DB " + dbIdentifier, e); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadata.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadata.java index 9417a1cb10..c9a946ac4a 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadata.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadata.java @@ -5,48 +5,49 @@ package org.opensearch.dataprepper.plugins.source.rds.model; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; + +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +@Getter +@AllArgsConstructor +@Builder public class DbMetadata { - private static final String DB_IDENTIFIER_KEY = "dbIdentifier"; - private static final String HOST_NAME_KEY = "hostName"; - private static final String PORT_KEY = "port"; + static final String DB_IDENTIFIER_KEY = "dbIdentifier"; + static final String ENDPOINT_KEY = "endpoint"; + static final String PORT_KEY = "port"; + static final String READER_ENDPOINT_KEY = "readerEndpoint"; + static final String READER_PORT_KEY = "readerPort"; + private final String dbIdentifier; - private final String hostName; + private final String endpoint; private final int port; - - public DbMetadata(final String dbIdentifier, final String hostName, final int port) { - this.dbIdentifier = dbIdentifier; - this.hostName = hostName; - this.port = port; - } - - public String getDbIdentifier() { - return dbIdentifier; - } - - public String getHostName() { - return hostName; - } - - public int getPort() { - return port; - } + private final String readerEndpoint; + private final int readerPort; public Map toMap() { - return Map.of( - DB_IDENTIFIER_KEY, dbIdentifier, - HOST_NAME_KEY, hostName, - PORT_KEY, port - ); + Map map = new HashMap<>(); + map.put(DB_IDENTIFIER_KEY, dbIdentifier); + map.put(ENDPOINT_KEY, endpoint); + map.put(PORT_KEY, port); + map.put(READER_ENDPOINT_KEY, readerEndpoint); + map.put(READER_PORT_KEY, readerPort); + + return Collections.unmodifiableMap(map); } public static DbMetadata fromMap(Map map) { return new DbMetadata( (String) map.get(DB_IDENTIFIER_KEY), - (String) map.get(HOST_NAME_KEY), - ((Integer) map.get(PORT_KEY)) + (String) map.get(ENDPOINT_KEY), + (Integer) map.get(PORT_KEY), + (String) map.get(READER_ENDPOINT_KEY), + (Integer) map.get(READER_PORT_KEY) ); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java new file mode 100644 index 0000000000..d5c75e3846 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java @@ -0,0 +1,151 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.resync; + +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ResyncScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ResyncScheduler.class); + + private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; + private static final int DEFAULT_NUM_RESYNC_WORKERS = 1; + + private final EnhancedSourceCoordinator sourceCoordinator; + private final RdsSourceConfig sourceConfig; + private final QueryManager queryManager; + private final String s3Prefix; + private final Buffer> buffer; + private final RecordConverter recordConverter; + private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + private final ExecutorService resyncExecutor; + + private volatile boolean shutdownRequested = false; + + public ResyncScheduler(final EnhancedSourceCoordinator sourceCoordinator, + final RdsSourceConfig sourceConfig, + final QueryManager queryManager, + final String s3Prefix, + final Buffer> buffer, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { + this.sourceCoordinator = sourceCoordinator; + this.sourceConfig = sourceConfig; + this.queryManager = queryManager; + this.s3Prefix = s3Prefix; + this.buffer = buffer; + recordConverter = new StreamRecordConverter(s3Prefix, sourceConfig.getPartitionCount()); + this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + + resyncExecutor = Executors.newFixedThreadPool(DEFAULT_NUM_RESYNC_WORKERS, + BackgroundThreadFactory.defaultExecutorThreadFactory("rds-source-resync-worker")); + } + + @Override + public void run() { + LOG.info("Start running Resync Scheduler"); + ResyncPartition resyncPartition = null; + while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { + try { + final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(ResyncPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + LOG.info("Acquired partition to perform resync"); + + resyncPartition = (ResyncPartition) sourcePartition.get(); + processResyncPartition(resyncPartition); + } + + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The ResyncScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + + } catch (Exception e) { + LOG.error("Received an exception during resync, backing off and retrying", e); + if (resyncPartition != null) { + sourceCoordinator.giveUpPartition(resyncPartition); + } + + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } + } + } + + public void shutdown() { + shutdownRequested = true; + } + + private void processResyncPartition(ResyncPartition resyncPartition) { + LOG.info("Processing resync partition: {}", resyncPartition.getPartitionKey()); + + AcknowledgementSet acknowledgementSet = null; + if (sourceConfig.isAcknowledgmentsEnabled()) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + if (result) { + sourceCoordinator.completePartition(resyncPartition); + LOG.info("Received acknowledgment of completion from sink for resync partition {}", resyncPartition.getPartitionKey()); + } else { + LOG.warn("Negative acknowledgment received for resync partition {}, retrying", resyncPartition.getPartitionKey()); + sourceCoordinator.giveUpPartition(resyncPartition); + } + }, sourceConfig.getStreamAcknowledgmentTimeout()); + } + + final ResyncWorker resyncWorker = ResyncWorker.create( + resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, getDBTableMetadata()); + + CompletableFuture.runAsync(resyncWorker, resyncExecutor) + .whenComplete((v, ex) -> { + if (ex != null) { + LOG.error("There was an exception while processing a resync partition", ex); + sourceCoordinator.giveUpPartition(resyncPartition); + } else { + LOG.info("Completed processing resync partition {}", resyncPartition.getPartitionKey()); + if (!sourceConfig.isAcknowledgmentsEnabled()) { + sourceCoordinator.completePartition(resyncPartition); + } + // else ack set will decide whether to complete or give up the partition + } + }); + } + + private DbTableMetadata getDBTableMetadata() { + final Optional globalStatePartition = sourceCoordinator.getPartition(sourceConfig.getDbIdentifier()); + final GlobalState globalState = (GlobalState) globalStatePartition.get(); + return DbTableMetadata.fromMap(globalState.getProgressState().get()); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java new file mode 100644 index 0000000000..f7c9c538d8 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.resync; + +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata.DOT_DELIMITER; + +public class ResyncWorker implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ResyncWorker.class); + private static final String QUERY_NULL_FORMAT_STRING = "SELECT * FROM %s WHERE %s IS NULL"; + private static final String QUERY_NOT_NULL_FORMAT_STRING = "SELECT * FROM %s WHERE %s='%s'"; + + static final String DATA_PREPPER_EVENT_TYPE = "event"; + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + + private final ResyncPartition resyncPartition; + private final RdsSourceConfig sourceConfig; + private final QueryManager queryManager; + private final Buffer> buffer; + private final RecordConverter recordConverter; + private final AcknowledgementSet acknowledgementSet; + private final DbTableMetadata dbTableMetadata; + + ResyncWorker(ResyncPartition resyncPartition, + RdsSourceConfig sourceConfig, + QueryManager queryManager, + Buffer> buffer, + RecordConverter recordConverter, + AcknowledgementSet acknowledgementSet, + DbTableMetadata dbTableMetadata) { + this.resyncPartition = resyncPartition; + this.sourceConfig = sourceConfig; + this.queryManager = queryManager; + this.buffer = buffer; + this.recordConverter = recordConverter; + this.acknowledgementSet = acknowledgementSet; + this.dbTableMetadata = dbTableMetadata; + } + + public static ResyncWorker create(ResyncPartition resyncPartition, + RdsSourceConfig sourceConfig, + QueryManager queryManager, + Buffer> buffer, + RecordConverter recordConverter, + AcknowledgementSet acknowledgementSet, + DbTableMetadata dbTableMetadata) { + return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, dbTableMetadata); + } + + public void run() { + ResyncPartition.PartitionKeyInfo partitionKeyInfo = resyncPartition.getPartitionKeyInfo(); + final String database = partitionKeyInfo.getDatabase(); + final String table = partitionKeyInfo.getTable(); + final long eventTimestampMillis = partitionKeyInfo.getTimestamp(); + + final ResyncProgressState progressState = validateAndGetProgressState(); + final String foreignKeyName = progressState.getForeignKeyName(); + final Object updatedValue = progressState.getUpdatedValue(); + final List primaryKeys = progressState.getPrimaryKeys(); + + final List> rows = executeQuery(database, table, foreignKeyName, updatedValue); + + processRows(rows, database, table, primaryKeys, eventTimestampMillis); + } + + private void processRows(List> rows, String database, String table, List primaryKeys, long eventTimestampMillis) { + BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + + for (Map row : rows) { + final Event dataPrepperEvent = JacksonEvent.builder() + .withEventType(DATA_PREPPER_EVENT_TYPE) + .withData(mapDataType(row, database + DOT_DELIMITER + table)) + .build(); + + final Event pipelineEvent = recordConverter.convert( + dataPrepperEvent, + database, + table, + OpenSearchBulkActions.INDEX, + primaryKeys, + eventTimestampMillis, + eventTimestampMillis, + null); + + if (acknowledgementSet != null) { + acknowledgementSet.add(pipelineEvent); + } + + try { + bufferAccumulator.add(new Record<>(pipelineEvent)); + } catch (Exception e) { + LOG.error("Failed to add event to buffer", e); + throw new RuntimeException(e); + } + } + + try { + bufferAccumulator.flush(); + if (acknowledgementSet != null) { + acknowledgementSet.complete(); + } + } catch (Exception e) { + // this will only happen if writing to buffer gets interrupted from shutdown, + // otherwise bufferAccumulator will keep retrying with backoff + LOG.error("Failed to flush buffer", e); + } + } + + private ResyncProgressState validateAndGetProgressState() { + return resyncPartition.getProgressState() + .orElseThrow(() -> new IllegalStateException( + "ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state.")); + } + + private List> executeQuery(String database, String table, String foreignKeyName, Object updatedValue) { + LOG.debug("Will perform resync on table: {}.{}, with foreign key name: {}, and updated value: {}", database, table, foreignKeyName, updatedValue); + final String fullTableName = database + DOT_DELIMITER + table; + String queryStatement; + if (updatedValue == null) { + queryStatement = String.format(QUERY_NULL_FORMAT_STRING, fullTableName, foreignKeyName); + } else { + queryStatement = String.format(QUERY_NOT_NULL_FORMAT_STRING, fullTableName, foreignKeyName, updatedValue); + } + LOG.debug("Query statement: {}", queryStatement); + + List> rows = queryManager.selectRows(queryStatement); + LOG.debug("Found {} rows to resync", rows.size()); + return rows; + } + + private Map mapDataType(final Map rowData, final String fullTableName) { + Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); + Map rowDataAfterMapping = new HashMap<>(); + for (Map.Entry entry : rowData.entrySet()) { + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + entry.getValue(), null); + rowDataAfterMapping.put(entry.getKey(), data); + } + return rowDataAfterMapping; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java new file mode 100644 index 0000000000..d89345fb71 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.schema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class QueryManager { + + private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class); + + static final int NUM_OF_RETRIES = 3; + static final int BACKOFF_IN_MILLIS = 500; + + private final ConnectionManager connectionManager; + + public QueryManager(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + + public List> selectRows(String query) { + return executeWithRetry(this::doSelectRows, query); + } + + private List> doSelectRows(String query) { + List> result = new ArrayList<>(); + try (final Connection connection = connectionManager.getConnection()) { + final Statement statement = connection.createStatement(); + try (ResultSet resultSet = statement.executeQuery(query)) { + return convertResultSetToList(resultSet); + } + } catch (Exception e) { + LOG.error("Failed to execute query {}, retrying", query); + throw new RuntimeException(e); + } + } + + private List> convertResultSetToList(ResultSet resultSet) throws SQLException { + ResultSetMetaData metaData = resultSet.getMetaData(); + List> result = new ArrayList<>(); + while (resultSet.next()) { + Map row = new HashMap<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row.put(metaData.getColumnName(i), resultSet.getObject(i)); + } + result.add(row); + } + return result; + } + + private R executeWithRetry(Function function, T query) { + int retry = 0; + while (retry <= NUM_OF_RETRIES) { + try { + return function.apply(query); + } catch (Exception e) { + applyBackoff(); + } + retry++; + } + throw new RuntimeException("Failed to execute query after " + NUM_OF_RETRIES + " retries"); + } + + private void applyBackoff() { + try { + Thread.sleep(BACKOFF_IN_MILLIS); + } catch (final InterruptedException e){ + Thread.currentThread().interrupt(); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java index f580e59fac..b63e588f01 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java @@ -31,7 +31,7 @@ public BinlogClientFactory(final RdsSourceConfig sourceConfig, public BinaryLogClient create() { BinaryLogClient binaryLogClient = new BinaryLogClient( - dbMetadata.getHostName(), + dbMetadata.getEndpoint(), dbMetadata.getPort(), username, password); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java index bbe90c7cf5..102b57f508 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java @@ -13,6 +13,7 @@ import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -25,6 +26,7 @@ import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler; import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler; import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler; import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler; import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener; import software.amazon.awssdk.services.rds.RdsClient; @@ -69,6 +71,9 @@ class RdsServiceTest { @Mock private ExecutorService executor; + @Mock + private ExecutorService resyncExecutor; + @Mock private EventFactory eventFactory; @@ -149,16 +154,21 @@ void test_normal_service_start_when_stream_is_enabled() { final RdsService rdsService = createObjectUnderTest(); final String[] s3PrefixArray = new String[1]; + + final BackgroundThreadFactory threadFactory = mock(BackgroundThreadFactory.class); try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); final MockedConstruction leaderSchedulerMockedConstruction = mockConstruction(LeaderScheduler.class, - (mock, context) -> s3PrefixArray[0] = (String) context.arguments().get(2))) { + (mock, context) -> s3PrefixArray[0] = (String) context.arguments().get(2)); + final MockedStatic backgroundThreadFactoryMockedStatic = mockStatic(BackgroundThreadFactory.class)) { executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executor); + backgroundThreadFactoryMockedStatic.when(() -> BackgroundThreadFactory.defaultExecutorThreadFactory(any())).thenReturn(threadFactory); rdsService.start(buffer); } assertThat(s3PrefixArray[0], equalTo(s3Prefix + S3_PATH_DELIMITER + IdentifierShortener.shortenIdentifier(partitionPrefix, MAX_SOURCE_IDENTIFIER_LENGTH))); verify(executor).submit(any(LeaderScheduler.class)); verify(executor).submit(any(StreamScheduler.class)); + verify(executor).submit(any(ResyncScheduler.class)); verify(executor, never()).submit(any(ExportScheduler.class)); verify(executor, never()).submit(any(DataFileScheduler.class)); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartitionTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartitionTest.java new file mode 100644 index 0000000000..4de2fcefa5 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartitionTest.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ResyncPartitionTest { + + @Mock + SourcePartitionStoreItem sourcePartitionStoreItem; + + @Mock + ResyncProgressState resyncProgressState; + + private ResyncPartition resyncPartition; + + @BeforeEach + void setUp() { + } + + @Test + void test_createPartition_from_parameters() { + final String database = UUID.randomUUID().toString(); + final String table = UUID.randomUUID().toString(); + final long timestamp = 1234567890L; + + resyncPartition = new ResyncPartition(database, table, timestamp, resyncProgressState); + + assertThat(resyncPartition.getPartitionType(), is(ResyncPartition.PARTITION_TYPE)); + assertThat(resyncPartition.getPartitionKey(), is(database + "|" + table + "|" + timestamp)); + assertThat(resyncPartition.getProgressState(), is(Optional.of(resyncProgressState))); + } + + @Test + void test_createPartition_from_storeItem() throws JsonProcessingException { + final String database = UUID.randomUUID().toString(); + final String table = UUID.randomUUID().toString(); + final long timestamp = 1234567890L; + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn(database + "|" + table + "|" + timestamp); + final String foreignKeyName = UUID.randomUUID().toString(); + final String updatedValue = UUID.randomUUID().toString(); + final String primaryKeyName = UUID.randomUUID().toString(); + ObjectMapper mapper = new ObjectMapper(); + final Map progressStateMap = Map.of( + "foreignKeyName", foreignKeyName, + "updatedValue", updatedValue, + "primaryKeys", List.of(primaryKeyName) + ); + final String progressStateString = mapper.writeValueAsString(progressStateMap); + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(progressStateString); + + resyncPartition = new ResyncPartition(sourcePartitionStoreItem); + + assertThat(resyncPartition.getPartitionType(), is(ResyncPartition.PARTITION_TYPE)); + assertThat(resyncPartition.getPartitionKey(), is(database + "|" + table + "|" + timestamp)); + + assertThat(resyncPartition.getProgressState().isPresent(), is(true)); + ResyncProgressState progressState = resyncPartition.getProgressState().get(); + assertThat(progressState.getForeignKeyName(), is(foreignKeyName)); + assertThat(progressState.getUpdatedValue(), is(updatedValue)); + assertThat(progressState.getPrimaryKeys(), is(List.of(primaryKeyName))); + } + + @Test + void test_createPartition_from_storeItem_with_invalid_partition_key_then_throws_exception() { + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn("invalid partition key"); + + assertThrows(IllegalArgumentException.class, () -> new ResyncPartition(sourcePartitionStoreItem)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java index 34ec5f7d60..150ad209b2 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -213,7 +213,11 @@ private void mockDbTableMetadata() { final String hostName = UUID.randomUUID().toString(); final int port = new Random().nextInt(); - final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final DbMetadata dbMetadata = DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(hostName) + .port(port) + .build(); final Map map = new HashMap<>(); map.put("dbMetadata", dbMetadata.toMap()); map.put("tableColumnDataTypeMap", tableColumnDataTypeMap); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategyTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategyTest.java index a64604e035..812d04c2ee 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategyTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/ClusterApiStrategyTest.java @@ -54,15 +54,17 @@ void setUp() { @Test void test_describeDb_returns_correct_results() { final String dbClusterId = UUID.randomUUID().toString(); - final String host = UUID.randomUUID().toString(); + final String endpoint = UUID.randomUUID().toString(); final int port = random.nextInt(); + final String readerEndpoint = UUID.randomUUID().toString(); final DescribeDbClustersRequest describeDbClustersRequest = DescribeDbClustersRequest.builder() .dbClusterIdentifier(dbClusterId) .build(); final DescribeDbClustersResponse describeDbClustersResponse = DescribeDbClustersResponse.builder() .dbClusters(DBCluster.builder() - .endpoint(host) + .endpoint(endpoint) .port(port) + .readerEndpoint(readerEndpoint) .build()) .build(); when(rdsClient.describeDBClusters(describeDbClustersRequest)).thenReturn(describeDbClustersResponse); @@ -70,8 +72,10 @@ void test_describeDb_returns_correct_results() { DbMetadata dbMetadata = objectUnderTest.describeDb(dbClusterId); assertThat(dbMetadata.getDbIdentifier(), equalTo(dbClusterId)); - assertThat(dbMetadata.getHostName(), equalTo(host)); + assertThat(dbMetadata.getEndpoint(), equalTo(endpoint)); assertThat(dbMetadata.getPort(), equalTo(port)); + assertThat(dbMetadata.getReaderEndpoint(), equalTo(readerEndpoint)); + assertThat(dbMetadata.getReaderPort(), equalTo(port)); } @Test diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategyTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategyTest.java index 8699c4b4be..fa94082f80 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategyTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/InstanceApiStrategyTest.java @@ -52,7 +52,7 @@ void setUp() { } @Test - void test_describeDb_returns_correct_results() { + void test_describeDb_without_read_replicas_returns_correct_results() { final String dbInstanceId = UUID.randomUUID().toString(); final String host = UUID.randomUUID().toString(); final int port = random.nextInt(); @@ -72,10 +72,53 @@ void test_describeDb_returns_correct_results() { DbMetadata dbMetadata = objectUnderTest.describeDb(dbInstanceId); assertThat(dbMetadata.getDbIdentifier(), equalTo(dbInstanceId)); - assertThat(dbMetadata.getHostName(), equalTo(host)); + assertThat(dbMetadata.getEndpoint(), equalTo(host)); assertThat(dbMetadata.getPort(), equalTo(port)); } + @Test + void test_describeDb_with_read_replicas_returns_correct_results() { + final String dbInstanceId = UUID.randomUUID().toString(); + final String endpoint = UUID.randomUUID().toString(); + final int port = random.nextInt(); + final String readerEndpoint = UUID.randomUUID().toString(); + final String readerInstanceId = UUID.randomUUID().toString(); + final DescribeDbInstancesRequest describeDbInstancesRequest = DescribeDbInstancesRequest.builder() + .dbInstanceIdentifier(dbInstanceId) + .build(); + final DescribeDbInstancesResponse describeDbInstancesResponse = DescribeDbInstancesResponse.builder() + .dbInstances(DBInstance.builder() + .endpoint(Endpoint.builder() + .address(endpoint) + .port(port) + .build()) + .readReplicaDBInstanceIdentifiers(readerInstanceId) + .build()) + .build(); + when(rdsClient.describeDBInstances(describeDbInstancesRequest)).thenReturn(describeDbInstancesResponse); + + final DescribeDbInstancesRequest describeReaderInstancesRequest = DescribeDbInstancesRequest.builder() + .dbInstanceIdentifier(readerInstanceId) + .build(); + final DescribeDbInstancesResponse describeReaderInstancesResponse = DescribeDbInstancesResponse.builder() + .dbInstances(DBInstance.builder() + .endpoint(Endpoint.builder() + .address(readerEndpoint) + .port(port) + .build()) + .build()) + .build(); + when(rdsClient.describeDBInstances(describeReaderInstancesRequest)).thenReturn(describeReaderInstancesResponse); + + DbMetadata dbMetadata = objectUnderTest.describeDb(dbInstanceId); + + assertThat(dbMetadata.getDbIdentifier(), equalTo(dbInstanceId)); + assertThat(dbMetadata.getEndpoint(), equalTo(endpoint)); + assertThat(dbMetadata.getPort(), equalTo(port)); + assertThat(dbMetadata.getReaderEndpoint(), equalTo(readerEndpoint)); + assertThat(dbMetadata.getReaderPort(), equalTo(port)); + } + @Test void test_create_snapshot_with_success() { final String dbInstanceId = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadataTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadataTest.java index 737d5423e5..1cbf150654 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadataTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbMetadataTest.java @@ -10,39 +10,58 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.is; +import static org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata.DB_IDENTIFIER_KEY; +import static org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata.ENDPOINT_KEY; +import static org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata.PORT_KEY; +import static org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata.READER_ENDPOINT_KEY; +import static org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata.READER_PORT_KEY; public class DbMetadataTest { @Test public void test_fromMap_success() { final String dbIdentifier = UUID.randomUUID().toString(); - final String hostName = UUID.randomUUID().toString(); + final String endpoint = UUID.randomUUID().toString(); final int port = new Random().nextInt(); + final String readerEndpoint = UUID.randomUUID().toString(); final Map map = new HashMap<>(); - map.put("dbIdentifier", dbIdentifier); - map.put("hostName", hostName); - map.put("port", port); + map.put(DB_IDENTIFIER_KEY, dbIdentifier); + map.put(ENDPOINT_KEY, endpoint); + map.put(PORT_KEY, port); + map.put(READER_ENDPOINT_KEY, readerEndpoint); + map.put(READER_PORT_KEY, port); final DbMetadata result = DbMetadata.fromMap(map); assertThat(result.getDbIdentifier(), is(dbIdentifier)); - assertThat(result.getHostName(), is(hostName)); + assertThat(result.getEndpoint(), is(endpoint)); assertThat(result.getPort(), is(port)); + assertThat(result.getReaderEndpoint(), is(readerEndpoint)); + assertThat(result.getReaderPort(), is(port)); } @Test public void test_toMap_success() { final String dbIdentifier = UUID.randomUUID().toString(); - final String hostName = UUID.randomUUID().toString(); + final String endpoint = UUID.randomUUID().toString(); final int port = new Random().nextInt(); - final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final String readerEndpoint = UUID.randomUUID().toString(); + final DbMetadata dbMetadata = DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(endpoint) + .port(port) + .readerEndpoint(readerEndpoint) + .readerPort(port) + .build(); final Map result = dbMetadata.toMap(); assertThat(result, is(notNullValue())); - assertThat(result.size(), is(3)); - assertThat(result.get("dbIdentifier"), is(dbIdentifier)); - assertThat(result.get("hostName"), is(hostName)); - assertThat(result.get("port"), is(port)); + assertThat(result.size(), is(5)); + assertThat(result.get(DB_IDENTIFIER_KEY), is(dbIdentifier)); + assertThat(result.get(ENDPOINT_KEY), is(endpoint)); + assertThat(result.get(PORT_KEY), is(port)); + assertThat(result.get(READER_ENDPOINT_KEY), is(readerEndpoint)); + assertThat(result.get(READER_PORT_KEY), is(port)); } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadataTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadataTest.java index 16b174d5f4..2c52833426 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadataTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadataTest.java @@ -16,11 +16,15 @@ public class DbTableMetadataTest { @Test public void test_fromMap_success() { final String dbIdentifier = UUID.randomUUID().toString(); - final String hostName = UUID.randomUUID().toString(); + final String endpoint = UUID.randomUUID().toString(); final int port = new Random().nextInt(); final String tableName = UUID.randomUUID().toString(); - final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final DbMetadata dbMetadata = DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(endpoint) + .port(port) + .build(); final Map> tableColumnDataTypeMap = new HashMap<>(); final Map columnDataTypeMap = new HashMap<>(); columnDataTypeMap.put("int_column", "INTEGER"); @@ -34,7 +38,7 @@ public void test_fromMap_success() { final DbTableMetadata result = DbTableMetadata.fromMap(map); assertThat(result.getDbMetadata().getDbIdentifier(), is(dbIdentifier)); - assertThat(result.getDbMetadata().getHostName(), is(hostName)); + assertThat(result.getDbMetadata().getEndpoint(), is(endpoint)); assertThat(result.getDbMetadata().getPort(), is(port)); assertThat(result.getTableColumnDataTypeMap(), is(tableColumnDataTypeMap)); } @@ -42,11 +46,15 @@ public void test_fromMap_success() { @Test public void test_toMap_success() { final String dbIdentifier = UUID.randomUUID().toString(); - final String hostName = UUID.randomUUID().toString(); + final String endpoint = UUID.randomUUID().toString(); final int port = new Random().nextInt(); final String tableName = UUID.randomUUID().toString(); - final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final DbMetadata dbMetadata = DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(endpoint) + .port(port) + .build(); final Map> tableColumnDataTypeMap = new HashMap<>(); final Map columnDataTypeMap = new HashMap<>(); columnDataTypeMap.put("int_column", "INTEGER"); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java new file mode 100644 index 0000000000..b8f13dc60a --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.resync; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ResyncSchedulerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private ResyncPartition resyncPartition; + + @Mock + private RdsSourceConfig sourceConfig; + + @Mock + private QueryManager queryManager; + + @Mock + private Buffer> buffer; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private DbTableMetadata dbTableMetadata; + + private String s3Prefix; + private ExecutorService resyncExecutor; + private ResyncScheduler resyncScheduler; + + + @BeforeEach + void setUp() { + s3Prefix = UUID.randomUUID().toString(); + resyncScheduler = createObjectUnderTest(); + } + + @Test + void test_run_then_complete_partition() { + final String dbIdentifier = UUID.randomUUID().toString(); + final GlobalState globalState = mock(GlobalState.class); + final Map progressState = mock(Map.class); + when(sourceCoordinator.acquireAvailablePartition(ResyncPartition.PARTITION_TYPE)).thenReturn(Optional.of(resyncPartition)); + when(sourceConfig.getDbIdentifier()).thenReturn(dbIdentifier); + when(sourceCoordinator.getPartition(dbIdentifier)).thenReturn(Optional.of(globalState)); + when(globalState.getProgressState()).thenReturn(Optional.of(progressState)); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ResyncWorker resyncWorker = mock(ResyncWorker.class); + doNothing().when(resyncWorker).run(); + + executorService.submit(() -> { + try (MockedStatic resyncWorkerMockedStatic = mockStatic(ResyncWorker.class); + MockedStatic dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) { + dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); + resyncWorkerMockedStatic.when(() -> ResyncWorker.create(eq(resyncPartition), eq(sourceConfig), + eq(queryManager), eq(buffer), any(RecordConverter.class), any(), eq(dbTableMetadata))).thenReturn(resyncWorker); + resyncScheduler.run(); + } + }); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(ResyncPartition.PARTITION_TYPE)); + executorService.shutdownNow(); + + verify(sourceCoordinator).completePartition(resyncPartition); + } + + private ResyncScheduler createObjectUnderTest() { + return new ResyncScheduler(sourceCoordinator, sourceConfig, queryManager, s3Prefix, buffer, pluginMetrics, acknowledgementSetManager); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java new file mode 100644 index 0000000000..ee140194d2 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.resync; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata.DOT_DELIMITER; + +@ExtendWith(MockitoExtension.class) +class ResyncWorkerTest { + + @Mock + private ResyncPartition resyncPartition; + + @Mock + private RdsSourceConfig sourceConfig; + + @Mock + private QueryManager queryManager; + + @Mock + private Buffer> buffer; + + @Mock + private RecordConverter recordConverter; + + @Mock + private AcknowledgementSet acknowledgementSet; + + @Mock + private DbTableMetadata dbTableMetadata; + + private ResyncWorker resyncWorker; + + @BeforeEach + void setUp() { + resyncWorker = createObjectUnderTest(); + } + + @Test + void test_run_process_resync_with_acknowledgments_enabled() throws Exception { + final String database = "test-database"; + final String table = "test-table"; + final long eventTimestampMillis = 1234567890L; + final ResyncPartition.PartitionKeyInfo partitionKeyInfo = mock(ResyncPartition.PartitionKeyInfo.class); + final ResyncProgressState progressState = mock(ResyncProgressState.class); + final String foreignKeyName = "test-foreign-key"; + final Object updatedValue = "test-updated-value"; + final String queryStatement = "SELECT * FROM " + database + "." + table + " WHERE " + foreignKeyName + "='" + updatedValue + "'"; + final String primaryKeyName = "test-primary-key"; + final String primaryKeyValue = "test-primary-key-value"; + final Map rowData = Map.of( + primaryKeyName, primaryKeyValue, + foreignKeyName, updatedValue + ); + final List> rows = List.of(rowData); + final Map> tableColumnTypeMap = Map.of( + database + DOT_DELIMITER + table, Map.of( + primaryKeyName, "varchar", + foreignKeyName, "varchar" + ) + ); + final Event dataPrepperEvent = mock(Event.class); + + when(resyncPartition.getPartitionKeyInfo()).thenReturn(partitionKeyInfo); + when(partitionKeyInfo.getDatabase()).thenReturn(database); + when(partitionKeyInfo.getTable()).thenReturn(table); + when(partitionKeyInfo.getTimestamp()).thenReturn(eventTimestampMillis); + when(resyncPartition.getProgressState()).thenReturn(Optional.of(progressState)); + when(progressState.getForeignKeyName()).thenReturn(foreignKeyName); + when(progressState.getUpdatedValue()).thenReturn(updatedValue); + when(progressState.getPrimaryKeys()).thenReturn(List.of(primaryKeyName)); + when(queryManager.selectRows(queryStatement)).thenReturn(rows); + when(dbTableMetadata.getTableColumnDataTypeMap()).thenReturn(tableColumnTypeMap); + when(recordConverter.convert(any(Event.class), eq(database), eq(table), eq(OpenSearchBulkActions.INDEX), + eq(List.of(primaryKeyName)), eq(eventTimestampMillis), eq(eventTimestampMillis), eq(null))) + .thenReturn(dataPrepperEvent); + + final BufferAccumulator> bufferAccumulator = mock(BufferAccumulator.class); + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, ResyncWorker.DEFAULT_BUFFER_BATCH_SIZE, ResyncWorker.BUFFER_TIMEOUT)) + .thenReturn(bufferAccumulator); + resyncWorker.run(); + } + + verify(acknowledgementSet).add(dataPrepperEvent); + + ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(bufferAccumulator).add(recordArgumentCaptor.capture()); + final Record record = recordArgumentCaptor.getValue(); + assertThat(record.getData(), is(dataPrepperEvent)); + + verify(bufferAccumulator).flush(); + verify(acknowledgementSet).complete(); + } + + private ResyncWorker createObjectUnderTest() { + return ResyncWorker.create(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, dbTableMetadata); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java new file mode 100644 index 0000000000..9ed44e908c --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.schema; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +class QueryManagerTest { + + @Mock + private ConnectionManager connectionManager; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Connection connection; + + @Mock + private ResultSet resultSet; + + private QueryManager queryManager; + + @BeforeEach + void setUp() { + queryManager = createObjectUnderTest(); + } + + @Test + void test_selectRows_returns_expected_list() throws SQLException { + final Statement statement = mock(Statement.class); + final String query = UUID.randomUUID().toString(); + final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class); + final String column1 = UUID.randomUUID().toString(); + final Object value1 = UUID.randomUUID().toString(); + when(connectionManager.getConnection()).thenReturn(connection); + when(connection.createStatement()).thenReturn(statement); + when(statement.executeQuery(query)).thenReturn(resultSet); + when(resultSet.getMetaData()).thenReturn(resultSetMetaData); + when(resultSet.next()).thenReturn(true, false); + when(resultSetMetaData.getColumnCount()).thenReturn(1); + when(resultSetMetaData.getColumnName(1)).thenReturn(column1); + when(resultSet.getObject(1)).thenReturn(value1); + + final List> result = queryManager.selectRows(query); + + assertThat(result.size(), is(1)); + final Map row = result.get(0); + assertThat(row.size(), is(1)); + assertThat(row.get(column1), is(value1)); + } + + @Test + void test_selectRows_throws_when_fails_get_query_results() throws SQLException { + final String query = UUID.randomUUID().toString(); + when(connectionManager.getConnection()).thenThrow(new RuntimeException()); + + assertThrows(RuntimeException.class, () -> queryManager.selectRows(query)); + } + + private QueryManager createObjectUnderTest() { + return new QueryManager(connectionManager); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java index 53ef6028db..c56ffd94d5 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java @@ -43,7 +43,7 @@ void test_create() { binlogClientFactory = createObjectUnderTest(); binlogClientFactory.create(); - verify(dbMetadata).getHostName(); + verify(dbMetadata).getEndpoint(); verify(dbMetadata).getPort(); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java index 4e6edadd5b..7647a5c008 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java @@ -240,7 +240,11 @@ private Map getDbTableMetaDataMap() { final int port = new Random().nextInt(); final String tableName = UUID.randomUUID().toString(); - final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final DbMetadata dbMetadata = DbMetadata.builder() + .dbIdentifier(dbIdentifier) + .endpoint(hostName) + .port(port) + .build(); final Map> tableColumnDataTypeMap = new HashMap<>(); final Map columnDataTypeMap = new HashMap<>(); columnDataTypeMap.put("int_column", "INTEGER");