Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Nov 14, 2024
1 parent 84d2fe9 commit e725ce6
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE));
}

private boolean isValidTableId(long tableId) {
// Visible For Testing
boolean isValidTableId(long tableId) {
if (!tableMetadataMap.containsKey(tableId)) {
LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read");
return false;
Expand All @@ -333,10 +334,11 @@ private boolean isValidTableId(long tableId) {
return true;
}

private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
long tableId,
List<Serializable[]> rows,
List<OpenSearchBulkActions> bulkActions) {
// Visible For Testing
void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
long tableId,
List<Serializable[]> rows,
List<OpenSearchBulkActions> bulkActions) {

// Update binlog coordinate after it's first assigned in rotate event handler
if (currentBinlogCoordinate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -24,22 +25,32 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -152,6 +163,58 @@ void test_given_UpdateRows_event_then_calls_correct_handler(EventType eventType)
verify(objectUnderTest).handleUpdateEvent(binlogEvent);
}

@ParameterizedTest
@EnumSource(names = {"UPDATE_ROWS", "EXT_UPDATE_ROWS"})
void test_given_UpdateRows_event_when_primary_key_changes_then_generate_correct_events(EventType eventType) throws NoSuchFieldException, IllegalAccessException {
final UpdateRowsEventData data = mock(UpdateRowsEventData.class);
final Serializable[] oldCol1Data = new Serializable[]{1, "a"};
final Serializable[] newCol1Data = new Serializable[]{2, "a"};
final Serializable[] oldCol2Data = new Serializable[]{3, "b"};
final Serializable[] newCol2Data = new Serializable[]{1, "b"};
final List<Map.Entry<Serializable[], Serializable[]>> rows = List.of(
Map.entry(oldCol1Data, newCol1Data),
Map.entry(oldCol2Data, newCol2Data)
);
final long tableId = 1234L;
when(binlogEvent.getHeader().getEventType()).thenReturn(eventType);
when(binlogEvent.getData()).thenReturn(data);
when(data.getTableId()).thenReturn(tableId);
when(objectUnderTest.isValidTableId(tableId)).thenReturn(true);
when(data.getRows()).thenReturn(rows);

// Set tableMetadataMap reflectively
final TableMetadata tableMetadata = mock(TableMetadata.class);
final Map<Long, TableMetadata> tableMetadataMap = Map.of(tableId, tableMetadata);
Field tableMetadataMapField = BinlogEventListener.class.getDeclaredField("tableMetadataMap");
tableMetadataMapField.setAccessible(true);
tableMetadataMapField.set(objectUnderTest, tableMetadataMap);
when(tableMetadata.getPrimaryKeys()).thenReturn(List.of("col1"));
when(tableMetadata.getColumnNames()).thenReturn(List.of("col1", "col2"));

objectUnderTest.onEvent(binlogEvent);

verifyHandlerCallHelper();
verify(objectUnderTest).handleUpdateEvent(binlogEvent);

// verify rowList and bulkActionList that were sent to handleRowChangeEvent() were correct
ArgumentCaptor<List<Serializable[]>> rowListArgumentCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<List<OpenSearchBulkActions>> bulkActionListArgumentCaptor = ArgumentCaptor.forClass(List.class);
verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture());
List<Serializable[]> rowList = rowListArgumentCaptor.getValue();
List<OpenSearchBulkActions> bulkActionList = bulkActionListArgumentCaptor.getValue();

assertThat(rowList.size(), is(4));
assertThat(bulkActionList.size(), is(4));
assertThat(rowList.get(0), is(oldCol1Data));
assertThat(bulkActionList.get(0), is(OpenSearchBulkActions.DELETE));
assertThat(rowList.get(1), is(newCol1Data));
assertThat(bulkActionList.get(1), is(OpenSearchBulkActions.INDEX));
assertThat(rowList.get(2), is(oldCol2Data));
assertThat(bulkActionList.get(2), is(OpenSearchBulkActions.DELETE));
assertThat(rowList.get(3), is(newCol2Data));
assertThat(bulkActionList.get(3), is(OpenSearchBulkActions.INDEX));
}

@ParameterizedTest
@EnumSource(names = {"DELETE_ROWS", "EXT_DELETE_ROWS"})
void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) {
Expand Down

0 comments on commit e725ce6

Please sign in to comment.