Skip to content

Commit

Permalink
Move event processing to separate threads and add event processing ti…
Browse files Browse the repository at this point in the history
…mer in RDS source (#4914)

* Move event processing to separate threads and add event processing timer

Signed-off-by: Hai Yan <[email protected]>

* Address comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Sep 5, 2024
1 parent 608fd15 commit 6bde685
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@
public class StreamConfig {

private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100;
private static final int DEFAULT_NUM_WORKERS = 1;

@JsonProperty("partition_count")
@Min(1)
@Max(1000)
private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT;

@JsonProperty("workers")
@Min(1)
@Max(1000)
private int numWorkers = DEFAULT_NUM_WORKERS;

public int getPartitionCount() {
return s3FolderPartitionCount;
}

public int getNumWorkers() {
return numWorkers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand All @@ -38,6 +40,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand All @@ -51,6 +55,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener {
static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";
static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime";

/**
* TableId to TableMetadata mapping
Expand All @@ -59,17 +64,20 @@ public class BinlogEventListener implements BinaryLogClient.EventListener {

private final StreamRecordConverter recordConverter;
private final BinaryLogClient binaryLogClient;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;
private final List<String> tableNames;
private final String s3Prefix;
private final boolean isAcknowledgmentsEnabled;
private final PluginMetrics pluginMetrics;
private final List<Event> pipelineEvents;
private final StreamCheckpointManager streamCheckpointManager;
private final ExecutorService binlogEventExecutorService;
private final Counter changeEventSuccessCounter;
private final Counter changeEventErrorCounter;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;
private final Timer eventProcessingTimer;


/**
* currentBinlogCoordinate is the coordinate where next event will start
Expand All @@ -82,15 +90,17 @@ public BinlogEventListener(final Buffer<Record<Event>> buffer,
final BinaryLogClient binaryLogClient,
final StreamCheckpointer streamCheckpointer,
final AcknowledgementSetManager acknowledgementSetManager) {
this.buffer = buffer;
this.binaryLogClient = binaryLogClient;
tableMetadataMap = new HashMap<>();
recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount());
bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
s3Prefix = sourceConfig.getS3Prefix();
tableNames = sourceConfig.getTableNames();
isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled();
this.pluginMetrics = pluginMetrics;
pipelineEvents = new ArrayList<>();
binlogEventExecutorService = Executors.newFixedThreadPool(
sourceConfig.getStream().getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("rds-source-binlog-processor"));

this.streamCheckpointManager = new StreamCheckpointManager(
streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(),
Expand All @@ -101,6 +111,7 @@ public BinlogEventListener(final Buffer<Record<Event>> buffer,
changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT);
bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
eventProcessingTimer = pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME);
}

@Override
Expand All @@ -109,29 +120,30 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) {

switch (eventType) {
case ROTATE:
handleEventAndErrors(event, this::handleRotateEvent);
processEvent(event, this::handleRotateEvent);
break;
case TABLE_MAP:
handleEventAndErrors(event, this::handleTableMapEvent);
processEvent(event, this::handleTableMapEvent);
break;
case WRITE_ROWS:
case EXT_WRITE_ROWS:
handleEventAndErrors(event, this::handleInsertEvent);
processEvent(event, this::handleInsertEvent);
break;
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
handleEventAndErrors(event, this::handleUpdateEvent);
processEvent(event, this::handleUpdateEvent);
break;
case DELETE_ROWS:
case EXT_DELETE_ROWS:
handleEventAndErrors(event, this::handleDeleteEvent);
processEvent(event, this::handleDeleteEvent);
break;
}
}

public void stopClient() {
try {
binaryLogClient.disconnect();
binlogEventExecutorService.shutdownNow();
LOG.info("Binary log client disconnected.");
} catch (Exception e) {
LOG.error("Binary log client failed to disconnect.", e);
Expand All @@ -150,6 +162,7 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
}
}
}

void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) {
final TableMapEventData data = event.getData();
final TableMapEventMetadata tableMapEventMetadata = data.getEventMetadata();
Expand Down Expand Up @@ -223,6 +236,8 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve
final List<String> primaryKeys = tableMetadata.getPrimaryKeys();
final long eventTimestampMillis = event.getHeader().getTimestamp();

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);

for (Object[] rowDataArray : rows) {
final Map<String, Object> rowDataMap = new HashMap<>();
for (int i = 0; i < rowDataArray.length; i++) {
Expand All @@ -242,7 +257,7 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve
pipelineEvents.add(pipelineEvent);
}

writeToBuffer(acknowledgementSet);
writeToBuffer(bufferAccumulator, acknowledgementSet);
bytesProcessedSummary.record(bytes);

if (isAcknowledgmentsEnabled) {
Expand All @@ -256,27 +271,27 @@ private boolean isTableOfInterest(String tableName) {
return new HashSet<>(tableNames).contains(tableName);
}

private void writeToBuffer(AcknowledgementSet acknowledgementSet) {
private void writeToBuffer(BufferAccumulator<Record<Event>> bufferAccumulator, AcknowledgementSet acknowledgementSet) {
for (Event pipelineEvent : pipelineEvents) {
addToBufferAccumulator(new Record<>(pipelineEvent));
addToBufferAccumulator(bufferAccumulator, new Record<>(pipelineEvent));
if (acknowledgementSet != null) {
acknowledgementSet.add(pipelineEvent);
}
}

flushBufferAccumulator(pipelineEvents.size());
flushBufferAccumulator(bufferAccumulator, pipelineEvents.size());
pipelineEvents.clear();
}

private void addToBufferAccumulator(final Record<Event> record) {
private void addToBufferAccumulator(final BufferAccumulator<Record<Event>> bufferAccumulator, final Record<Event> record) {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed to add event to buffer", e);
}
}

private void flushBufferAccumulator(int eventCount) {
private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccumulator, int eventCount) {
try {
bufferAccumulator.flush();
changeEventSuccessCounter.increment(eventCount);
Expand All @@ -288,10 +303,14 @@ private void flushBufferAccumulator(int eventCount) {
}
}

private void processEvent(com.github.shyiko.mysql.binlog.event.Event event, Consumer<com.github.shyiko.mysql.binlog.event.Event> function) {
binlogEventExecutorService.submit(() -> handleEventAndErrors(event, function));
}

private void handleEventAndErrors(com.github.shyiko.mysql.binlog.event.Event event,
Consumer<com.github.shyiko.mysql.binlog.event.Event> function) {
try {
function.accept(event);
eventProcessingTimer.record(() -> function.accept(event));
} catch (Exception e) {
LOG.error("Failed to process change event of type {}", event.getHeader().getEventType(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventType;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand All @@ -22,10 +26,18 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogEventListener.REPLICATION_LOG_EVENT_PROCESSING_TIME;

@ExtendWith(MockitoExtension.class)
class BinlogEventListenerTest {
Expand All @@ -48,14 +60,32 @@ class BinlogEventListenerTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private ExecutorService eventListnerExecutorService;

@Mock
private ExecutorService checkpointManagerExecutorService;

@Mock
private ThreadFactory threadFactory;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private com.github.shyiko.mysql.binlog.event.Event binlogEvent;

private static BinlogEventListener objectUnderTest;
private BinlogEventListener objectUnderTest;

private Timer eventProcessingTimer;

@BeforeEach
void setUp() {
objectUnderTest = spy(createObjectUnderTest());
eventProcessingTimer = Metrics.timer("test-timer");
when(pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME)).thenReturn(eventProcessingTimer);
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt(), any(ThreadFactory.class))).thenReturn(eventListnerExecutorService);
executorsMockedStatic.when(Executors::newSingleThreadExecutor).thenReturn(checkpointManagerExecutorService);
executorsMockedStatic.when(Executors::defaultThreadFactory).thenReturn(threadFactory);
objectUnderTest = spy(createObjectUnderTest());
}
}

@Test
Expand All @@ -65,6 +95,7 @@ void test_given_TableMap_event_then_calls_correct_handler() {

objectUnderTest.onEvent(binlogEvent);

verifyHandlerCallHelper();
verify(objectUnderTest).handleTableMapEvent(binlogEvent);
}

Expand All @@ -76,6 +107,7 @@ void test_given_WriteRows_event_then_calls_correct_handler(EventType eventType)

objectUnderTest.onEvent(binlogEvent);

verifyHandlerCallHelper();
verify(objectUnderTest).handleInsertEvent(binlogEvent);
}

Expand All @@ -87,6 +119,7 @@ void test_given_UpdateRows_event_then_calls_correct_handler(EventType eventType)

objectUnderTest.onEvent(binlogEvent);

verifyHandlerCallHelper();
verify(objectUnderTest).handleUpdateEvent(binlogEvent);
}

Expand All @@ -98,10 +131,19 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType)

objectUnderTest.onEvent(binlogEvent);

verifyHandlerCallHelper();
verify(objectUnderTest).handleDeleteEvent(binlogEvent);
}

private BinlogEventListener createObjectUnderTest() {
return new BinlogEventListener(buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager);
}

private void verifyHandlerCallHelper() {
ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(eventListnerExecutorService).submit(runnableArgumentCaptor.capture());

Runnable capturedRunnable = runnableArgumentCaptor.getValue();
capturedRunnable.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class StreamSchedulerTest {

@Mock
private EnhancedSourceCoordinator sourceCoordinator;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private RdsSourceConfig sourceConfig;

Expand Down Expand Up @@ -85,6 +86,8 @@ void test_given_stream_partition_then_start_stream() throws InterruptedException

StreamWorker streamWorker = mock(StreamWorker.class);
doNothing().when(streamWorker).processStream(streamPartition);
when(sourceConfig.getStream().getNumWorkers()).thenReturn(1);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class)) {
Expand Down

0 comments on commit 6bde685

Please sign in to comment.