diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java index 5e4321119d..7fee5ba4be 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartition.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; +import lombok.Getter; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; @@ -18,21 +19,23 @@ public class ResyncPartition extends EnhancedSourcePartition getProgressState() { } return Optional.empty(); } + + public PartitionKeyInfo getPartitionKeyInfo() { + return partitionKeyInfo; + } + + @Getter + public static class PartitionKeyInfo { + private final String database; + private final String table; + private final long timestamp; + + private PartitionKeyInfo(String database, String table, long timestamp) { + this.database = database; + this.table = table; + this.timestamp = timestamp; + } + + private static PartitionKeyInfo fromString(String partitionKey) { + String[] keySplits = partitionKey.split("\\|"); + if (keySplits.length != 3) { + throw new IllegalArgumentException("Invalid partition key: " + partitionKey); + } + return new PartitionKeyInfo(keySplits[0], keySplits[1], Long.parseLong(keySplits[2])); + } + + @Override + public String toString() { + return database + "|" + table + "|" + timestamp; + } + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java index 988449ca5c..d5c75e3846 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java @@ -17,7 +17,9 @@ import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +70,7 @@ public ResyncScheduler(final EnhancedSourceCoordinator sourceCoordinator, @Override public void run() { - LOG.debug("Start running Stream Scheduler"); + LOG.info("Start running Resync Scheduler"); ResyncPartition resyncPartition = null; while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { try { @@ -124,7 +126,7 @@ private void processResyncPartition(ResyncPartition resyncPartition) { } final ResyncWorker resyncWorker = ResyncWorker.create( - resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet); + resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, getDBTableMetadata()); CompletableFuture.runAsync(resyncWorker, resyncExecutor) .whenComplete((v, ex) -> { @@ -140,4 +142,10 @@ private void processResyncPartition(ResyncPartition resyncPartition) { } }); } + + private DbTableMetadata getDBTableMetadata() { + final Optional globalStatePartition = sourceCoordinator.getPartition(sourceConfig.getDbIdentifier()); + final GlobalState globalState = (GlobalState) globalStatePartition.get(); + return DbTableMetadata.fromMap(globalState.getProgressState().get()); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java index 376bca684b..f7c9c538d8 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorker.java @@ -16,16 +16,24 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata.DOT_DELIMITER; + public class ResyncWorker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ResyncWorker.class); + private static final String QUERY_NULL_FORMAT_STRING = "SELECT * FROM %s WHERE %s IS NULL"; + private static final String QUERY_NOT_NULL_FORMAT_STRING = "SELECT * FROM %s WHERE %s='%s'"; static final String DATA_PREPPER_EVENT_TYPE = "event"; static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); @@ -37,19 +45,22 @@ public class ResyncWorker implements Runnable { private final Buffer> buffer; private final RecordConverter recordConverter; private final AcknowledgementSet acknowledgementSet; + private final DbTableMetadata dbTableMetadata; ResyncWorker(ResyncPartition resyncPartition, RdsSourceConfig sourceConfig, QueryManager queryManager, Buffer> buffer, RecordConverter recordConverter, - AcknowledgementSet acknowledgementSet) { + AcknowledgementSet acknowledgementSet, + DbTableMetadata dbTableMetadata) { this.resyncPartition = resyncPartition; this.sourceConfig = sourceConfig; this.queryManager = queryManager; this.buffer = buffer; this.recordConverter = recordConverter; this.acknowledgementSet = acknowledgementSet; + this.dbTableMetadata = dbTableMetadata; } public static ResyncWorker create(ResyncPartition resyncPartition, @@ -57,42 +68,34 @@ public static ResyncWorker create(ResyncPartition resyncPartition, QueryManager queryManager, Buffer> buffer, RecordConverter recordConverter, - AcknowledgementSet acknowledgementSet) { - return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet); + AcknowledgementSet acknowledgementSet, + DbTableMetadata dbTableMetadata) { + return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, dbTableMetadata); } public void run() { - String[] keySplits = resyncPartition.getPartitionKey().split("\\|"); - final String database = keySplits[0]; - final String table = keySplits[1]; - final long eventTimestampMillis = Long.parseLong(keySplits[2]); - - if (resyncPartition.getProgressState().isEmpty()) { - final String errorMessage = "ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state."; - throw new RuntimeException(errorMessage); - } - final ResyncProgressState progressState = resyncPartition.getProgressState().get(); + ResyncPartition.PartitionKeyInfo partitionKeyInfo = resyncPartition.getPartitionKeyInfo(); + final String database = partitionKeyInfo.getDatabase(); + final String table = partitionKeyInfo.getTable(); + final long eventTimestampMillis = partitionKeyInfo.getTimestamp(); + + final ResyncProgressState progressState = validateAndGetProgressState(); final String foreignKeyName = progressState.getForeignKeyName(); final Object updatedValue = progressState.getUpdatedValue(); + final List primaryKeys = progressState.getPrimaryKeys(); - LOG.debug("Will perform resync on table: {}.{}, with foreign key name: {}, and updated value: {}", database, table, foreignKeyName, updatedValue); - String queryStatement; - if (updatedValue == null) { - queryStatement = String.format("SELECT * FROM %s WHERE %s IS NULL", database + "." + table, foreignKeyName); - } else { - queryStatement = String.format("SELECT * FROM %s WHERE %s='%s'", database + "." + table, foreignKeyName, updatedValue); - } - LOG.debug("Query statement: {}", queryStatement); + final List> rows = executeQuery(database, table, foreignKeyName, updatedValue); - List> rows = queryManager.selectRows(queryStatement); - LOG.debug("Found {} rows to resync", rows.size()); + processRows(rows, database, table, primaryKeys, eventTimestampMillis); + } + private void processRows(List> rows, String database, String table, List primaryKeys, long eventTimestampMillis) { BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); for (Map row : rows) { final Event dataPrepperEvent = JacksonEvent.builder() .withEventType(DATA_PREPPER_EVENT_TYPE) - .withData(row) + .withData(mapDataType(row, database + DOT_DELIMITER + table)) .build(); final Event pipelineEvent = recordConverter.convert( @@ -100,7 +103,7 @@ public void run() { database, table, OpenSearchBulkActions.INDEX, - progressState.getPrimaryKeys(), + primaryKeys, eventTimestampMillis, eventTimestampMillis, null); @@ -128,4 +131,37 @@ public void run() { LOG.error("Failed to flush buffer", e); } } + + private ResyncProgressState validateAndGetProgressState() { + return resyncPartition.getProgressState() + .orElseThrow(() -> new IllegalStateException( + "ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state.")); + } + + private List> executeQuery(String database, String table, String foreignKeyName, Object updatedValue) { + LOG.debug("Will perform resync on table: {}.{}, with foreign key name: {}, and updated value: {}", database, table, foreignKeyName, updatedValue); + final String fullTableName = database + DOT_DELIMITER + table; + String queryStatement; + if (updatedValue == null) { + queryStatement = String.format(QUERY_NULL_FORMAT_STRING, fullTableName, foreignKeyName); + } else { + queryStatement = String.format(QUERY_NOT_NULL_FORMAT_STRING, fullTableName, foreignKeyName, updatedValue); + } + LOG.debug("Query statement: {}", queryStatement); + + List> rows = queryManager.selectRows(queryStatement); + LOG.debug("Found {} rows to resync", rows.size()); + return rows; + } + + private Map mapDataType(final Map rowData, final String fullTableName) { + Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); + Map rowDataAfterMapping = new HashMap<>(); + for (Map.Entry entry : rowData.entrySet()) { + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + entry.getValue(), null); + rowDataAfterMapping.put(entry.getKey(), data); + } + return rowDataAfterMapping; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java index ac99220e1d..d89345fb71 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManager.java @@ -17,11 +17,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; public class QueryManager { private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class); + static final int NUM_OF_RETRIES = 3; + static final int BACKOFF_IN_MILLIS = 500; + private final ConnectionManager connectionManager; public QueryManager(ConnectionManager connectionManager) { @@ -29,6 +33,10 @@ public QueryManager(ConnectionManager connectionManager) { } public List> selectRows(String query) { + return executeWithRetry(this::doSelectRows, query); + } + + private List> doSelectRows(String query) { List> result = new ArrayList<>(); try (final Connection connection = connectionManager.getConnection()) { final Statement statement = connection.createStatement(); @@ -36,8 +44,8 @@ public List> selectRows(String query) { return convertResultSetToList(resultSet); } } catch (Exception e) { - LOG.error("Failed to execute query {}, retrying", query, e); - return result; + LOG.error("Failed to execute query {}, retrying", query); + throw new RuntimeException(e); } } @@ -53,4 +61,25 @@ private List> convertResultSetToList(ResultSet resultSet) th } return result; } + + private R executeWithRetry(Function function, T query) { + int retry = 0; + while (retry <= NUM_OF_RETRIES) { + try { + return function.apply(query); + } catch (Exception e) { + applyBackoff(); + } + retry++; + } + throw new RuntimeException("Failed to execute query after " + NUM_OF_RETRIES + " retries"); + } + + private void applyBackoff() { + try { + Thread.sleep(BACKOFF_IN_MILLIS); + } catch (final InterruptedException e){ + Thread.currentThread().interrupt(); + } + } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartitionTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartitionTest.java new file mode 100644 index 0000000000..4de2fcefa5 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ResyncPartitionTest.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ResyncPartitionTest { + + @Mock + SourcePartitionStoreItem sourcePartitionStoreItem; + + @Mock + ResyncProgressState resyncProgressState; + + private ResyncPartition resyncPartition; + + @BeforeEach + void setUp() { + } + + @Test + void test_createPartition_from_parameters() { + final String database = UUID.randomUUID().toString(); + final String table = UUID.randomUUID().toString(); + final long timestamp = 1234567890L; + + resyncPartition = new ResyncPartition(database, table, timestamp, resyncProgressState); + + assertThat(resyncPartition.getPartitionType(), is(ResyncPartition.PARTITION_TYPE)); + assertThat(resyncPartition.getPartitionKey(), is(database + "|" + table + "|" + timestamp)); + assertThat(resyncPartition.getProgressState(), is(Optional.of(resyncProgressState))); + } + + @Test + void test_createPartition_from_storeItem() throws JsonProcessingException { + final String database = UUID.randomUUID().toString(); + final String table = UUID.randomUUID().toString(); + final long timestamp = 1234567890L; + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn(database + "|" + table + "|" + timestamp); + final String foreignKeyName = UUID.randomUUID().toString(); + final String updatedValue = UUID.randomUUID().toString(); + final String primaryKeyName = UUID.randomUUID().toString(); + ObjectMapper mapper = new ObjectMapper(); + final Map progressStateMap = Map.of( + "foreignKeyName", foreignKeyName, + "updatedValue", updatedValue, + "primaryKeys", List.of(primaryKeyName) + ); + final String progressStateString = mapper.writeValueAsString(progressStateMap); + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(progressStateString); + + resyncPartition = new ResyncPartition(sourcePartitionStoreItem); + + assertThat(resyncPartition.getPartitionType(), is(ResyncPartition.PARTITION_TYPE)); + assertThat(resyncPartition.getPartitionKey(), is(database + "|" + table + "|" + timestamp)); + + assertThat(resyncPartition.getProgressState().isPresent(), is(true)); + ResyncProgressState progressState = resyncPartition.getProgressState().get(); + assertThat(progressState.getForeignKeyName(), is(foreignKeyName)); + assertThat(progressState.getUpdatedValue(), is(updatedValue)); + assertThat(progressState.getPrimaryKeys(), is(List.of(primaryKeyName))); + } + + @Test + void test_createPartition_from_storeItem_with_invalid_partition_key_then_throws_exception() { + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn("invalid partition key"); + + assertThrows(IllegalArgumentException.class, () -> new ResyncPartition(sourcePartitionStoreItem)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java index f5d859694e..b8f13dc60a 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncSchedulerTest.java @@ -19,10 +19,13 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -61,6 +64,9 @@ class ResyncSchedulerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private DbTableMetadata dbTableMetadata; + private String s3Prefix; private ExecutorService resyncExecutor; private ResyncScheduler resyncScheduler; @@ -74,16 +80,24 @@ void setUp() { @Test void test_run_then_complete_partition() { + final String dbIdentifier = UUID.randomUUID().toString(); + final GlobalState globalState = mock(GlobalState.class); + final Map progressState = mock(Map.class); when(sourceCoordinator.acquireAvailablePartition(ResyncPartition.PARTITION_TYPE)).thenReturn(Optional.of(resyncPartition)); + when(sourceConfig.getDbIdentifier()).thenReturn(dbIdentifier); + when(sourceCoordinator.getPartition(dbIdentifier)).thenReturn(Optional.of(globalState)); + when(globalState.getProgressState()).thenReturn(Optional.of(progressState)); final ExecutorService executorService = Executors.newSingleThreadExecutor(); final ResyncWorker resyncWorker = mock(ResyncWorker.class); doNothing().when(resyncWorker).run(); executorService.submit(() -> { - try (MockedStatic resyncWorkerMockedStatic = mockStatic(ResyncWorker.class)) { + try (MockedStatic resyncWorkerMockedStatic = mockStatic(ResyncWorker.class); + MockedStatic dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) { + dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); resyncWorkerMockedStatic.when(() -> ResyncWorker.create(eq(resyncPartition), eq(sourceConfig), - eq(queryManager), eq(buffer), any(RecordConverter.class), any())).thenReturn(resyncWorker); + eq(queryManager), eq(buffer), any(RecordConverter.class), any(), eq(dbTableMetadata))).thenReturn(resyncWorker); resyncScheduler.run(); } }); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java index b3a8662eba..ee140194d2 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncWorkerTest.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import java.util.List; @@ -36,6 +37,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata.DOT_DELIMITER; @ExtendWith(MockitoExtension.class) class ResyncWorkerTest { @@ -58,6 +60,9 @@ class ResyncWorkerTest { @Mock private AcknowledgementSet acknowledgementSet; + @Mock + private DbTableMetadata dbTableMetadata; + private ResyncWorker resyncWorker; @BeforeEach @@ -70,6 +75,7 @@ void test_run_process_resync_with_acknowledgments_enabled() throws Exception { final String database = "test-database"; final String table = "test-table"; final long eventTimestampMillis = 1234567890L; + final ResyncPartition.PartitionKeyInfo partitionKeyInfo = mock(ResyncPartition.PartitionKeyInfo.class); final ResyncProgressState progressState = mock(ResyncProgressState.class); final String foreignKeyName = "test-foreign-key"; final Object updatedValue = "test-updated-value"; @@ -81,14 +87,24 @@ void test_run_process_resync_with_acknowledgments_enabled() throws Exception { foreignKeyName, updatedValue ); final List> rows = List.of(rowData); + final Map> tableColumnTypeMap = Map.of( + database + DOT_DELIMITER + table, Map.of( + primaryKeyName, "varchar", + foreignKeyName, "varchar" + ) + ); final Event dataPrepperEvent = mock(Event.class); - when(resyncPartition.getPartitionKey()).thenReturn(database + "|" + table + "|" + eventTimestampMillis); + when(resyncPartition.getPartitionKeyInfo()).thenReturn(partitionKeyInfo); + when(partitionKeyInfo.getDatabase()).thenReturn(database); + when(partitionKeyInfo.getTable()).thenReturn(table); + when(partitionKeyInfo.getTimestamp()).thenReturn(eventTimestampMillis); when(resyncPartition.getProgressState()).thenReturn(Optional.of(progressState)); when(progressState.getForeignKeyName()).thenReturn(foreignKeyName); when(progressState.getUpdatedValue()).thenReturn(updatedValue); when(progressState.getPrimaryKeys()).thenReturn(List.of(primaryKeyName)); when(queryManager.selectRows(queryStatement)).thenReturn(rows); + when(dbTableMetadata.getTableColumnDataTypeMap()).thenReturn(tableColumnTypeMap); when(recordConverter.convert(any(Event.class), eq(database), eq(table), eq(OpenSearchBulkActions.INDEX), eq(List.of(primaryKeyName)), eq(eventTimestampMillis), eq(eventTimestampMillis), eq(null))) .thenReturn(dataPrepperEvent); @@ -112,6 +128,6 @@ void test_run_process_resync_with_acknowledgments_enabled() throws Exception { } private ResyncWorker createObjectUnderTest() { - return ResyncWorker.create(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet); + return ResyncWorker.create(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, dbTableMetadata); } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java index 4383e801b7..9ed44e908c 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/QueryManagerTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -71,13 +72,11 @@ void test_selectRows_returns_expected_list() throws SQLException { } @Test - void test_selectRows_returns_empty_list_when_exception_occurs() throws SQLException { + void test_selectRows_throws_when_fails_get_query_results() throws SQLException { final String query = UUID.randomUUID().toString(); when(connectionManager.getConnection()).thenThrow(new RuntimeException()); - final List> result = queryManager.selectRows(query); - - assertThat(result.size(), is(0)); + assertThrows(RuntimeException.class, () -> queryManager.selectRows(query)); } private QueryManager createObjectUnderTest() {