diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java index c246e56b45..f2b5af11a1 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java @@ -12,13 +12,23 @@ public class StreamConfig { private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100; + private static final int DEFAULT_NUM_WORKERS = 1; @JsonProperty("partition_count") @Min(1) @Max(1000) private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT; + @JsonProperty("workers") + @Min(1) + @Max(1000) + private int numWorkers = DEFAULT_NUM_WORKERS; + public int getPartitionCount() { return s3FolderPartitionCount; } + + public int getNumWorkers() { + return numWorkers; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index 181716a69a..33defb42b7 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -16,7 +16,9 @@ import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -38,6 +40,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -51,6 +55,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors"; static final String BYTES_RECEIVED = "bytesReceived"; static final String BYTES_PROCESSED = "bytesProcessed"; + static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime"; /** * TableId to TableMetadata mapping @@ -59,17 +64,20 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { private final StreamRecordConverter recordConverter; private final BinaryLogClient binaryLogClient; - private final BufferAccumulator> bufferAccumulator; + private final Buffer> buffer; private final List tableNames; private final String s3Prefix; private final boolean isAcknowledgmentsEnabled; private final PluginMetrics pluginMetrics; private final List pipelineEvents; private final StreamCheckpointManager streamCheckpointManager; + private final ExecutorService binlogEventExecutorService; private final Counter changeEventSuccessCounter; private final Counter changeEventErrorCounter; private final DistributionSummary bytesReceivedSummary; private final DistributionSummary bytesProcessedSummary; + private final Timer eventProcessingTimer; + /** * currentBinlogCoordinate is the coordinate where next event will start @@ -82,15 +90,17 @@ public BinlogEventListener(final Buffer> buffer, final BinaryLogClient binaryLogClient, final StreamCheckpointer streamCheckpointer, final AcknowledgementSetManager acknowledgementSetManager) { + this.buffer = buffer; this.binaryLogClient = binaryLogClient; tableMetadataMap = new HashMap<>(); recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount()); - bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); s3Prefix = sourceConfig.getS3Prefix(); tableNames = sourceConfig.getTableNames(); isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled(); this.pluginMetrics = pluginMetrics; pipelineEvents = new ArrayList<>(); + binlogEventExecutorService = Executors.newFixedThreadPool( + sourceConfig.getStream().getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("rds-source-binlog-processor")); this.streamCheckpointManager = new StreamCheckpointManager( streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), @@ -101,6 +111,7 @@ public BinlogEventListener(final Buffer> buffer, changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); + eventProcessingTimer = pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME); } @Override @@ -109,22 +120,22 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { switch (eventType) { case ROTATE: - handleEventAndErrors(event, this::handleRotateEvent); + processEvent(event, this::handleRotateEvent); break; case TABLE_MAP: - handleEventAndErrors(event, this::handleTableMapEvent); + processEvent(event, this::handleTableMapEvent); break; case WRITE_ROWS: case EXT_WRITE_ROWS: - handleEventAndErrors(event, this::handleInsertEvent); + processEvent(event, this::handleInsertEvent); break; case UPDATE_ROWS: case EXT_UPDATE_ROWS: - handleEventAndErrors(event, this::handleUpdateEvent); + processEvent(event, this::handleUpdateEvent); break; case DELETE_ROWS: case EXT_DELETE_ROWS: - handleEventAndErrors(event, this::handleDeleteEvent); + processEvent(event, this::handleDeleteEvent); break; } } @@ -132,6 +143,7 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { public void stopClient() { try { binaryLogClient.disconnect(); + binlogEventExecutorService.shutdownNow(); LOG.info("Binary log client disconnected."); } catch (Exception e) { LOG.error("Binary log client failed to disconnect.", e); @@ -150,6 +162,7 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) { } } } + void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { final TableMapEventData data = event.getData(); final TableMapEventMetadata tableMapEventMetadata = data.getEventMetadata(); @@ -223,6 +236,8 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve final List primaryKeys = tableMetadata.getPrimaryKeys(); final long eventTimestampMillis = event.getHeader().getTimestamp(); + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + for (Object[] rowDataArray : rows) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { @@ -242,7 +257,7 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve pipelineEvents.add(pipelineEvent); } - writeToBuffer(acknowledgementSet); + writeToBuffer(bufferAccumulator, acknowledgementSet); bytesProcessedSummary.record(bytes); if (isAcknowledgmentsEnabled) { @@ -256,19 +271,19 @@ private boolean isTableOfInterest(String tableName) { return new HashSet<>(tableNames).contains(tableName); } - private void writeToBuffer(AcknowledgementSet acknowledgementSet) { + private void writeToBuffer(BufferAccumulator> bufferAccumulator, AcknowledgementSet acknowledgementSet) { for (Event pipelineEvent : pipelineEvents) { - addToBufferAccumulator(new Record<>(pipelineEvent)); + addToBufferAccumulator(bufferAccumulator, new Record<>(pipelineEvent)); if (acknowledgementSet != null) { acknowledgementSet.add(pipelineEvent); } } - flushBufferAccumulator(pipelineEvents.size()); + flushBufferAccumulator(bufferAccumulator, pipelineEvents.size()); pipelineEvents.clear(); } - private void addToBufferAccumulator(final Record record) { + private void addToBufferAccumulator(final BufferAccumulator> bufferAccumulator, final Record record) { try { bufferAccumulator.add(record); } catch (Exception e) { @@ -276,7 +291,7 @@ private void addToBufferAccumulator(final Record record) { } } - private void flushBufferAccumulator(int eventCount) { + private void flushBufferAccumulator(BufferAccumulator> bufferAccumulator, int eventCount) { try { bufferAccumulator.flush(); changeEventSuccessCounter.increment(eventCount); @@ -288,10 +303,14 @@ private void flushBufferAccumulator(int eventCount) { } } + private void processEvent(com.github.shyiko.mysql.binlog.event.Event event, Consumer function) { + binlogEventExecutorService.submit(() -> handleEventAndErrors(event, function)); + } + private void handleEventAndErrors(com.github.shyiko.mysql.binlog.event.Event event, Consumer function) { try { - function.accept(event); + eventProcessingTimer.record(() -> function.accept(event)); } catch (Exception e) { LOG.error("Failed to process change event of type {}", event.getHeader().getEventType(), e); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index d33ecd3a70..c287ec2f00 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -7,13 +7,17 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.EventType; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Answers; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -22,10 +26,18 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogEventListener.REPLICATION_LOG_EVENT_PROCESSING_TIME; @ExtendWith(MockitoExtension.class) class BinlogEventListenerTest { @@ -48,14 +60,32 @@ class BinlogEventListenerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private ExecutorService eventListnerExecutorService; + + @Mock + private ExecutorService checkpointManagerExecutorService; + + @Mock + private ThreadFactory threadFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private com.github.shyiko.mysql.binlog.event.Event binlogEvent; - private static BinlogEventListener objectUnderTest; + private BinlogEventListener objectUnderTest; + + private Timer eventProcessingTimer; @BeforeEach void setUp() { - objectUnderTest = spy(createObjectUnderTest()); + eventProcessingTimer = Metrics.timer("test-timer"); + when(pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME)).thenReturn(eventProcessingTimer); + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { + executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt(), any(ThreadFactory.class))).thenReturn(eventListnerExecutorService); + executorsMockedStatic.when(Executors::newSingleThreadExecutor).thenReturn(checkpointManagerExecutorService); + executorsMockedStatic.when(Executors::defaultThreadFactory).thenReturn(threadFactory); + objectUnderTest = spy(createObjectUnderTest()); + } } @Test @@ -65,6 +95,7 @@ void test_given_TableMap_event_then_calls_correct_handler() { objectUnderTest.onEvent(binlogEvent); + verifyHandlerCallHelper(); verify(objectUnderTest).handleTableMapEvent(binlogEvent); } @@ -76,6 +107,7 @@ void test_given_WriteRows_event_then_calls_correct_handler(EventType eventType) objectUnderTest.onEvent(binlogEvent); + verifyHandlerCallHelper(); verify(objectUnderTest).handleInsertEvent(binlogEvent); } @@ -87,6 +119,7 @@ void test_given_UpdateRows_event_then_calls_correct_handler(EventType eventType) objectUnderTest.onEvent(binlogEvent); + verifyHandlerCallHelper(); verify(objectUnderTest).handleUpdateEvent(binlogEvent); } @@ -98,10 +131,19 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) objectUnderTest.onEvent(binlogEvent); + verifyHandlerCallHelper(); verify(objectUnderTest).handleDeleteEvent(binlogEvent); } private BinlogEventListener createObjectUnderTest() { return new BinlogEventListener(buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager); } + + private void verifyHandlerCallHelper() { + ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventListnerExecutorService).submit(runnableArgumentCaptor.capture()); + + Runnable capturedRunnable = runnableArgumentCaptor.getValue(); + capturedRunnable.run(); + } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java index 3c8b70cab2..0414495f6c 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java @@ -42,6 +42,7 @@ class StreamSchedulerTest { @Mock private EnhancedSourceCoordinator sourceCoordinator; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private RdsSourceConfig sourceConfig; @@ -85,6 +86,8 @@ void test_given_stream_partition_then_start_stream() throws InterruptedException StreamWorker streamWorker = mock(StreamWorker.class); doNothing().when(streamWorker).processStream(streamPartition); + when(sourceConfig.getStream().getNumWorkers()).thenReturn(1); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class)) {