From e725ce6e892e1eb2d8c6edc7b2bd1c3a6bc3c3ad Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Thu, 14 Nov 2024 12:35:23 -0600 Subject: [PATCH] Add unit tests Signed-off-by: Hai Yan --- .../rds/stream/BinlogEventListener.java | 12 ++-- .../rds/stream/BinlogEventListenerTest.java | 63 +++++++++++++++++++ 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index c7935a5420..4491a7c643 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -319,7 +319,8 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE)); } - private boolean isValidTableId(long tableId) { + // Visible For Testing + boolean isValidTableId(long tableId) { if (!tableMetadataMap.containsKey(tableId)) { LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read"); return false; @@ -333,10 +334,11 @@ private boolean isValidTableId(long tableId) { return true; } - private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, - long tableId, - List rows, - List bulkActions) { + // Visible For Testing + void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, + long tableId, + List rows, + List bulkActions) { // Update binlog coordinate after it's first assigned in rotate event handler if (currentBinlogCoordinate != null) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 92738b971e..95035f4501 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -7,6 +7,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; @@ -24,22 +25,32 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector; import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -152,6 +163,58 @@ void test_given_UpdateRows_event_then_calls_correct_handler(EventType eventType) verify(objectUnderTest).handleUpdateEvent(binlogEvent); } + @ParameterizedTest + @EnumSource(names = {"UPDATE_ROWS", "EXT_UPDATE_ROWS"}) + void test_given_UpdateRows_event_when_primary_key_changes_then_generate_correct_events(EventType eventType) throws NoSuchFieldException, IllegalAccessException { + final UpdateRowsEventData data = mock(UpdateRowsEventData.class); + final Serializable[] oldCol1Data = new Serializable[]{1, "a"}; + final Serializable[] newCol1Data = new Serializable[]{2, "a"}; + final Serializable[] oldCol2Data = new Serializable[]{3, "b"}; + final Serializable[] newCol2Data = new Serializable[]{1, "b"}; + final List> rows = List.of( + Map.entry(oldCol1Data, newCol1Data), + Map.entry(oldCol2Data, newCol2Data) + ); + final long tableId = 1234L; + when(binlogEvent.getHeader().getEventType()).thenReturn(eventType); + when(binlogEvent.getData()).thenReturn(data); + when(data.getTableId()).thenReturn(tableId); + when(objectUnderTest.isValidTableId(tableId)).thenReturn(true); + when(data.getRows()).thenReturn(rows); + + // Set tableMetadataMap reflectively + final TableMetadata tableMetadata = mock(TableMetadata.class); + final Map tableMetadataMap = Map.of(tableId, tableMetadata); + Field tableMetadataMapField = BinlogEventListener.class.getDeclaredField("tableMetadataMap"); + tableMetadataMapField.setAccessible(true); + tableMetadataMapField.set(objectUnderTest, tableMetadataMap); + when(tableMetadata.getPrimaryKeys()).thenReturn(List.of("col1")); + when(tableMetadata.getColumnNames()).thenReturn(List.of("col1", "col2")); + + objectUnderTest.onEvent(binlogEvent); + + verifyHandlerCallHelper(); + verify(objectUnderTest).handleUpdateEvent(binlogEvent); + + // verify rowList and bulkActionList that were sent to handleRowChangeEvent() were correct + ArgumentCaptor> rowListArgumentCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor> bulkActionListArgumentCaptor = ArgumentCaptor.forClass(List.class); + verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture()); + List rowList = rowListArgumentCaptor.getValue(); + List bulkActionList = bulkActionListArgumentCaptor.getValue(); + + assertThat(rowList.size(), is(4)); + assertThat(bulkActionList.size(), is(4)); + assertThat(rowList.get(0), is(oldCol1Data)); + assertThat(bulkActionList.get(0), is(OpenSearchBulkActions.DELETE)); + assertThat(rowList.get(1), is(newCol1Data)); + assertThat(bulkActionList.get(1), is(OpenSearchBulkActions.INDEX)); + assertThat(rowList.get(2), is(oldCol2Data)); + assertThat(bulkActionList.get(2), is(OpenSearchBulkActions.DELETE)); + assertThat(rowList.get(3), is(newCol2Data)); + assertThat(bulkActionList.get(3), is(OpenSearchBulkActions.INDEX)); + } + @ParameterizedTest @EnumSource(names = {"DELETE_ROWS", "EXT_DELETE_ROWS"}) void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) {