From 567842dabd57a039b718166b5596676e87db3305 Mon Sep 17 00:00:00 2001 From: Hai Yan <8153134+oeyh@users.noreply.github.com> Date: Tue, 30 Jul 2024 12:09:20 -0500 Subject: [PATCH] Add rds source metrics (#4769) * Add rds source metrics Signed-off-by: Hai Yan * Remove unused imports Signed-off-by: Hai Yan * Add exportS3ObjectsErrors metric Signed-off-by: Hai Yan --------- Signed-off-by: Hai Yan Signed-off-by: Krishna Kondaka --- data-prepper-plugins/rds-source/build.gradle | 3 + .../plugins/source/rds/RdsService.java | 2 +- .../source/rds/export/DataFileLoader.java | 46 +++++- .../source/rds/export/DataFileScheduler.java | 24 ++- .../source/rds/export/ExportScheduler.java | 27 +++- .../rds/stream/BinlogEventListener.java | 52 ++++++- .../source/rds/stream/StreamScheduler.java | 3 +- .../source/rds/export/DataFileLoaderTest.java | 137 ++++++++++++++++-- .../rds/export/DataFileSchedulerTest.java | 63 +++++++- .../rds/export/ExportSchedulerTest.java | 82 ++++++++++- .../rds/stream/BinlogEventListenerTest.java | 6 +- 11 files changed, 404 insertions(+), 41 deletions(-) diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index 6d6a681646..14c851f645 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -26,4 +26,7 @@ dependencies { testImplementation project(path: ':data-prepper-test-common') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation project(path: ':data-prepper-test-event') + testImplementation libs.avro.core + testImplementation libs.parquet.hadoop + testImplementation libs.parquet.avro } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 982751a3db..73b71e0085 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -88,7 +88,7 @@ public void start(Buffer> buffer) { exportScheduler = new ExportScheduler( sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics); dataFileScheduler = new DataFileScheduler( - sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer); + sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics); runnableList.add(exportScheduler); runnableList.add(dataFileScheduler); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index f12d44a75f..42a5b3a0d5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -5,7 +5,10 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -18,12 +21,18 @@ import java.io.InputStream; import java.time.Duration; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class DataFileLoader implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class); static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5); + static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal"; + static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed"; + static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors"; + static final String BYTES_RECEIVED = "bytesReceived"; + static final String BYTES_PROCESSED = "bytesProcessed"; private final DataFilePartition dataFilePartition; private final String bucket; @@ -32,12 +41,18 @@ public class DataFileLoader implements Runnable { private final InputCodec codec; private final BufferAccumulator> bufferAccumulator; private final ExportRecordConverter recordConverter; + private final Counter exportRecordsTotalCounter; + private final Counter exportRecordSuccessCounter; + private final Counter exportRecordErrorCounter; + private final DistributionSummary bytesReceivedSummary; + private final DistributionSummary bytesProcessedSummary; private DataFileLoader(final DataFilePartition dataFilePartition, - final InputCodec codec, - final BufferAccumulator> bufferAccumulator, - final S3ObjectReader objectReader, - final ExportRecordConverter recordConverter) { + final InputCodec codec, + final BufferAccumulator> bufferAccumulator, + final S3ObjectReader objectReader, + final ExportRecordConverter recordConverter, + final PluginMetrics pluginMetrics) { this.dataFilePartition = dataFilePartition; bucket = dataFilePartition.getBucket(); objectKey = dataFilePartition.getKey(); @@ -45,24 +60,37 @@ private DataFileLoader(final DataFilePartition dataFilePartition, this.codec = codec; this.bufferAccumulator = bufferAccumulator; this.recordConverter = recordConverter; + + exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); + exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); + exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT); + bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); + bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); } public static DataFileLoader create(final DataFilePartition dataFilePartition, final InputCodec codec, final BufferAccumulator> bufferAccumulator, final S3ObjectReader objectReader, - final ExportRecordConverter recordConverter) { - return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter); + final ExportRecordConverter recordConverter, + final PluginMetrics pluginMetrics) { + return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics); } @Override public void run() { LOG.info("Start loading s3://{}/{}", bucket, objectKey); + AtomicLong eventCount = new AtomicLong(); try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) { - codec.parse(inputStream, record -> { try { + exportRecordsTotalCounter.increment(); + final Event event = record.getData(); + final String string = event.toJsonString(); + final long bytes = string.getBytes().length; + bytesReceivedSummary.record(bytes); + DataFileProgressState progressState = dataFilePartition.getProgressState().get(); // TODO: primary key to be obtained by querying database schema @@ -79,6 +107,8 @@ public void run() { snapshotTime, eventVersionNumber)); bufferAccumulator.add(transformedRecord); + eventCount.getAndIncrement(); + bytesProcessedSummary.record(bytes); } catch (Exception e) { throw new RuntimeException(e); } @@ -92,8 +122,10 @@ public void run() { try { bufferAccumulator.flush(); + exportRecordSuccessCounter.increment(eventCount.get()); } catch (Exception e) { LOG.error("Failed to write events to buffer", e); + exportRecordErrorCounter.increment(eventCount.get()); } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java index 0a2b2fb638..f766aec3d2 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -5,7 +5,9 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -48,6 +50,9 @@ public class DataFileScheduler implements Runnable { static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed"; + static final String EXPORT_S3_OBJECTS_ERROR_COUNT = "exportS3ObjectsErrors"; + static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers"; private final EnhancedSourceCoordinator sourceCoordinator; @@ -57,6 +62,11 @@ public class DataFileScheduler implements Runnable { private final InputCodec codec; private final BufferAccumulator> bufferAccumulator; private final ExportRecordConverter recordConverter; + private final PluginMetrics pluginMetrics; + + private final Counter exportFileSuccessCounter; + private final Counter exportFileErrorCounter; + private final AtomicInteger activeExportS3ObjectConsumersGauge; private volatile boolean shutdownRequested = false; @@ -64,7 +74,8 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, final S3Client s3Client, final EventFactory eventFactory, - final Buffer> buffer) { + final Buffer> buffer, + final PluginMetrics pluginMetrics) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; codec = new ParquetInputCodec(eventFactory); @@ -72,6 +83,12 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, objectReader = new S3ObjectReader(s3Client); recordConverter = new ExportRecordConverter(); executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT); + this.pluginMetrics = pluginMetrics; + + this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); + this.exportFileErrorCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_ERROR_COUNT); + this.activeExportS3ObjectConsumersGauge = pluginMetrics.gauge( + ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, numOfWorkers, AtomicInteger::get); } @Override @@ -116,15 +133,18 @@ public void shutdown() { } private void processDataFilePartition(DataFilePartition dataFilePartition) { - Runnable loader = DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter); + Runnable loader = DataFileLoader.create( + dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics); CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); runLoader.whenComplete((v, ex) -> { if (ex == null) { + exportFileSuccessCounter.increment(); // Update global state so we know if all s3 files have been loaded updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT); sourceCoordinator.completePartition(dataFilePartition); } else { + exportFileErrorCounter.increment(); LOG.error("There was an exception while processing an S3 data file", ex); sourceCoordinator.giveUpPartition(dataFilePartition); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java index 79ef3d5a61..343ade8b85 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; @@ -42,12 +43,15 @@ public class ExportScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class); private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; - private static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10); - private static final int DEFAULT_MAX_CLOSE_COUNT = 36; + static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10); + static final int DEFAULT_MAX_CLOSE_COUNT = 36; private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; private static final int DEFAULT_CHECK_STATUS_INTERVAL_MILLS = 30 * 1000; private static final Duration DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT = Duration.ofMinutes(60); static final String PARQUET_SUFFIX = ".parquet"; + static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess"; + static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure"; + static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal"; private final S3Client s3Client; private final PluginMetrics pluginMetrics; @@ -56,6 +60,10 @@ public class ExportScheduler implements Runnable { private final ExportTaskManager exportTaskManager; private final SnapshotManager snapshotManager; + private final Counter exportJobSuccessCounter; + private final Counter exportJobFailureCounter; + private final Counter exportS3ObjectsTotalCounter; + private volatile boolean shutdownRequested = false; public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, @@ -69,6 +77,10 @@ public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, this.executor = Executors.newCachedThreadPool(); this.snapshotManager = snapshotManager; this.exportTaskManager = exportTaskManager; + + exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT); + exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT); + exportS3ObjectsTotalCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT); } @Override @@ -133,8 +145,7 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { progressState.setSnapshotId(snapshotInfo.getSnapshotId()); sourceCoordinator.saveProgressStateForPartition(exportPartition, null); } else { - LOG.error("The snapshot failed to create, it will be retried"); - closeExportPartitionWithError(exportPartition); + LOG.error("The snapshot failed to create. The export will be retried"); return null; } @@ -142,8 +153,7 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { try { snapshotInfo = checkSnapshotStatus(snapshotId, DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT); } catch (Exception e) { - LOG.warn("Check snapshot status for {} failed", snapshotId, e); - sourceCoordinator.giveUpPartition(exportPartition); + LOG.warn("Check snapshot status for {} failed. The export will be retried", snapshotId, e); return null; } progressState.setSnapshotTime(snapshotInfo.getCreateTime().toEpochMilli()); @@ -159,7 +169,6 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { sourceCoordinator.saveProgressStateForPartition(exportPartition, null); } else { LOG.error("The export task failed to create, it will be retried"); - closeExportPartitionWithError(exportPartition); return null; } @@ -167,6 +176,7 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { } private void closeExportPartitionWithError(ExportPartition exportPartition) { + exportJobFailureCounter.increment(); ExportProgressState exportProgressState = exportPartition.getProgressState().get(); // Clear current task id, so that a new export can be submitted. exportProgressState.setExportTaskId(null); @@ -309,12 +319,15 @@ private void createDataFilePartitions(String bucket, String exportTaskId, List> bufferAccumulator; private final List tableNames; private final String s3Prefix; + private final PluginMetrics pluginMetrics; - public BinlogEventListener(final Buffer> buffer, final RdsSourceConfig sourceConfig) { + private final Counter changeEventSuccessCounter; + private final Counter changeEventErrorCounter; + private final DistributionSummary bytesReceivedSummary; + private final DistributionSummary bytesProcessedSummary; + + public BinlogEventListener(final Buffer> buffer, + final RdsSourceConfig sourceConfig, + final PluginMetrics pluginMetrics) { tableMetadataMap = new HashMap<>(); recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount()); bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); s3Prefix = sourceConfig.getS3Prefix(); tableNames = sourceConfig.getTableNames(); + this.pluginMetrics = pluginMetrics; + + changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); + changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); + bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); + bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); } @Override @@ -95,7 +116,9 @@ void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { } void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { - // get new row data from the event + final long bytes = event.toString().getBytes().length; + bytesReceivedSummary.record(bytes); + LOG.debug("Handling insert event"); final WriteRowsEventData data = event.getData(); if (!tableMetadataMap.containsKey(data.getTableId())) { @@ -113,6 +136,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { final long eventTimestampMillis = event.getHeader().getTimestamp(); // Construct data prepper JacksonEvent + int eventCount = 0; for (final Object[] rowDataArray : data.getRows()) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { @@ -130,12 +154,17 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { eventTimestampMillis, eventTimestampMillis); addToBuffer(new Record<>(pipelineEvent)); + eventCount++; } + bytesProcessedSummary.record(bytes); - flushBuffer(); + flushBuffer(eventCount); } void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { + final long bytes = event.toString().getBytes().length; + bytesReceivedSummary.record(bytes); + LOG.debug("Handling update event"); final UpdateRowsEventData data = event.getData(); if (!tableMetadataMap.containsKey(data.getTableId())) { @@ -151,6 +180,7 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { final List primaryKeys = tableMetadata.getPrimaryKeys(); final long eventTimestampMillis = event.getHeader().getTimestamp(); + int eventCount = 0; for (Map.Entry updatedRow : data.getRows()) { // updatedRow contains data before update as key and data after update as value final Object[] rowData = updatedRow.getValue(); @@ -171,12 +201,17 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { eventTimestampMillis, eventTimestampMillis); addToBuffer(new Record<>(pipelineEvent)); + eventCount++; } + bytesProcessedSummary.record(bytes); - flushBuffer(); + flushBuffer(eventCount); } void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { + final long bytes = event.toString().getBytes().length; + bytesReceivedSummary.record(bytes); + LOG.debug("Handling delete event"); final DeleteRowsEventData data = event.getData(); if (!tableMetadataMap.containsKey(data.getTableId())) { @@ -193,6 +228,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { final List primaryKeys = tableMetadata.getPrimaryKeys(); final long eventTimestampMillis = event.getHeader().getTimestamp(); + int eventCount = 0; for (Object[] rowDataArray : data.getRows()) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { @@ -210,9 +246,11 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { eventTimestampMillis, eventTimestampMillis); addToBuffer(new Record<>(pipelineEvent)); + eventCount++; } + bytesProcessedSummary.record(bytes); - flushBuffer(); + flushBuffer(eventCount); } private boolean isTableOfInterest(String tableName) { @@ -227,11 +265,13 @@ private void addToBuffer(final Record record) { } } - private void flushBuffer() { + private void flushBuffer(int eventCount) { try { bufferAccumulator.flush(); + changeEventSuccessCounter.increment(eventCount); } catch (Exception e) { LOG.error("Failed to flush buffer", e); + changeEventErrorCounter.increment(eventCount); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index 0b42c95c38..acb4ea3f85 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -41,8 +41,9 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.binaryLogClient = binaryLogClient; - this.binaryLogClient.registerEventListener(new BinlogEventListener(buffer, sourceConfig)); + this.binaryLogClient.registerEventListener(new BinlogEventListener(buffer, sourceConfig, pluginMetrics)); this.pluginMetrics = pluginMetrics; + } @Override diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java index 1ed91bc031..ccb36347fa 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java @@ -5,38 +5,61 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.BaseEventBuilder; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec; import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; import java.io.InputStream; +import java.util.Optional; import java.util.UUID; -import java.util.function.Consumer; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.BYTES_PROCESSED; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.BYTES_RECEIVED; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.EXPORT_RECORDS_PROCESSED_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.EXPORT_RECORDS_TOTAL_COUNT; @ExtendWith(MockitoExtension.class) class DataFileLoaderTest { - @Mock + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private DataFilePartition dataFilePartition; @Mock private BufferAccumulator> bufferAccumulator; @Mock - private InputCodec codec; + private EventFactory eventFactory; @Mock private S3ObjectReader s3ObjectReader; @@ -44,24 +67,120 @@ class DataFileLoaderTest { @Mock private ExportRecordConverter recordConverter; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter exportRecordsTotalCounter; + + @Mock + private Counter exportRecordSuccessCounter; + + @Mock + private Counter exportRecordErrorCounter; + + @Mock + private DistributionSummary bytesReceivedSummary; + + @Mock + private DistributionSummary bytesProcessedSummary; + + @BeforeEach + void setUp() { + when(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).thenReturn(exportRecordsTotalCounter); + when(pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT)).thenReturn(exportRecordSuccessCounter); + when(pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT)).thenReturn(exportRecordErrorCounter); + when(pluginMetrics.summary(BYTES_RECEIVED)).thenReturn(bytesReceivedSummary); + when(pluginMetrics.summary(BYTES_PROCESSED)).thenReturn(bytesProcessedSummary); + } + + @Test + void test_run_success() throws Exception { + final String bucket = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + when(dataFilePartition.getBucket()).thenReturn(bucket); + when(dataFilePartition.getKey()).thenReturn(key); + final DataFileProgressState progressState = mock(DataFileProgressState.class, RETURNS_DEEP_STUBS); + when(dataFilePartition.getProgressState()).thenReturn(Optional.of(progressState)); + + InputStream inputStream = mock(InputStream.class); + when(s3ObjectReader.readFile(bucket, key)).thenReturn(inputStream); + + DataFileLoader dataFileLoader = createObjectUnderTest(); + + final String randomString = UUID.randomUUID().toString(); + final long sizeBytes = randomString.getBytes().length; + final BaseEventBuilder eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS); + final Event event = mock(Event.class); + when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); + when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); + when(event.toJsonString()).thenReturn(randomString); + + try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class)) { + ParquetReader parquetReader = mock(ParquetReader.class); + AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + readerMockedStatic.when(() -> AvroParquetReader.builder(any(InputFile.class), any())).thenReturn(builder); + when(builder.build()).thenReturn(parquetReader); + when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + + dataFileLoader.run(); + } + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + + verify(exportRecordsTotalCounter).increment(); + verify(bytesReceivedSummary).record(sizeBytes); + verify(bytesProcessedSummary).record(sizeBytes); + verify(exportRecordSuccessCounter).increment(1); + verify(exportRecordErrorCounter, never()).increment(1); + } + @Test - void test_run() throws Exception { + void test_flush_failure_then_error_metric_updated() throws Exception { final String bucket = UUID.randomUUID().toString(); final String key = UUID.randomUUID().toString(); when(dataFilePartition.getBucket()).thenReturn(bucket); when(dataFilePartition.getKey()).thenReturn(key); + final DataFileProgressState progressState = mock(DataFileProgressState.class, RETURNS_DEEP_STUBS); + when(dataFilePartition.getProgressState()).thenReturn(Optional.of(progressState)); InputStream inputStream = mock(InputStream.class); when(s3ObjectReader.readFile(bucket, key)).thenReturn(inputStream); - DataFileLoader objectUnderTest = createObjectUnderTest(); - objectUnderTest.run(); + DataFileLoader dataFileLoader = createObjectUnderTest(); - verify(codec).parse(eq(inputStream), any(Consumer.class)); + final String randomString = UUID.randomUUID().toString(); + final long sizeBytes = randomString.getBytes().length; + final BaseEventBuilder eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS); + final Event event = mock(Event.class); + when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); + when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); + when(event.toJsonString()).thenReturn(randomString); + doThrow(new RuntimeException("testing")).when(bufferAccumulator).flush(); + + try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class)) { + ParquetReader parquetReader = mock(ParquetReader.class); + AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + readerMockedStatic.when(() -> AvroParquetReader.builder(any(InputFile.class), any())).thenReturn(builder); + when(builder.build()).thenReturn(parquetReader); + when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + + dataFileLoader.run(); + } + + verify(bufferAccumulator).add(any(Record.class)); verify(bufferAccumulator).flush(); + + verify(exportRecordsTotalCounter).increment(); + verify(bytesReceivedSummary).record(sizeBytes); + verify(bytesProcessedSummary).record(sizeBytes); + verify(exportRecordSuccessCounter, never()).increment(1); + verify(exportRecordErrorCounter).increment(1); } private DataFileLoader createObjectUnderTest() { - return DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, s3ObjectReader, recordConverter); + final InputCodec codec = new ParquetInputCodec(eventFactory); + return DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, s3ObjectReader, recordConverter, pluginMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java index ee0d0e2852..5a5a56c6fd 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -12,6 +13,7 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -32,17 +34,22 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler.ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler.EXPORT_S3_OBJECTS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class DataFileSchedulerTest { @@ -62,20 +69,37 @@ class DataFileSchedulerTest { @Mock private Buffer> buffer; + @Mock + private PluginMetrics pluginMetrics; + @Mock private DataFilePartition dataFilePartition; + @Mock + private Counter exportFileSuccessCounter; + + @Mock + private Counter exportFileErrorCounter; + + @Mock + private AtomicInteger activeExportS3ObjectConsumersGauge; + private Random random; @BeforeEach void setUp() { random = new Random(); + when(pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT)).thenReturn(exportFileSuccessCounter); + when(pluginMetrics.counter(eq(DataFileScheduler.EXPORT_S3_OBJECTS_ERROR_COUNT))).thenReturn(exportFileErrorCounter); + when(pluginMetrics.gauge(eq(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE), any(AtomicInteger.class), any())) + .thenReturn(activeExportS3ObjectConsumersGauge); } @Test void test_given_no_datafile_partition_then_no_export() throws InterruptedException { when(sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + final DataFileScheduler objectUnderTest = createObjectUnderTest(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(objectUnderTest); @@ -84,12 +108,11 @@ void test_given_no_datafile_partition_then_no_export() throws InterruptedExcepti Thread.sleep(100); executorService.shutdownNow(); - verifyNoInteractions(s3Client, buffer); + verifyNoInteractions(s3Client, buffer, exportFileSuccessCounter, activeExportS3ObjectConsumersGauge); } @Test void test_given_available_datafile_partition_then_load_datafile() { - DataFileScheduler objectUnderTest = createObjectUnderTest(); final String exportTaskId = UUID.randomUUID().toString(); when(dataFilePartition.getExportTaskId()).thenReturn(exportTaskId); @@ -100,13 +123,15 @@ void test_given_available_datafile_partition_then_load_datafile() { when(globalStatePartition.getProgressState()).thenReturn(Optional.of(loadStatusMap)); when(sourceCoordinator.getPartition(exportTaskId)).thenReturn(Optional.of(globalStatePartition)); + DataFileScheduler objectUnderTest = createObjectUnderTest(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { // MockedStatic needs to be created on the same thread it's used try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { DataFileLoader dataFileLoader = mock(DataFileLoader.class); dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( - eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), any(ExportRecordConverter.class))) + eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), + any(ExportRecordConverter.class), any(PluginMetrics.class))) .thenReturn(dataFileLoader); doNothing().when(dataFileLoader).run(); objectUnderTest.run(); @@ -116,9 +141,39 @@ void test_given_available_datafile_partition_then_load_datafile() { .untilAsserted(() -> verify(sourceCoordinator).completePartition(dataFilePartition)); executorService.shutdownNow(); + verify(exportFileSuccessCounter).increment(); + verify(exportFileErrorCounter, never()).increment(); verify(sourceCoordinator).completePartition(dataFilePartition); } + @Test + void test_data_file_loader_throws_exception_then_give_up_partition() { + + when(sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).thenReturn(Optional.of(dataFilePartition)); + + DataFileScheduler objectUnderTest = createObjectUnderTest(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> { + // MockedStatic needs to be created on the same thread it's used + try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { + DataFileLoader dataFileLoader = mock(DataFileLoader.class); + dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( + eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), + any(ExportRecordConverter.class), any(PluginMetrics.class))) + .thenReturn(dataFileLoader); + doThrow(new RuntimeException()).when(dataFileLoader).run(); + objectUnderTest.run(); + } + }); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).giveUpPartition(dataFilePartition)); + executorService.shutdownNow(); + + verify(exportFileSuccessCounter, never()).increment(); + verify(exportFileErrorCounter).increment(); + verify(sourceCoordinator).giveUpPartition(dataFilePartition); + } + @Test void test_shutdown() { DataFileScheduler objectUnderTest = createObjectUnderTest(); @@ -132,6 +187,6 @@ void test_shutdown() { } private DataFileScheduler createObjectUnderTest() { - return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer); + return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java index a1a520a47a..f5036e8890 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -42,6 +43,11 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.DEFAULT_CLOSE_DURATION; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.DEFAULT_MAX_CLOSE_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT; import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.PARQUET_SUFFIX; @@ -63,6 +69,15 @@ class ExportSchedulerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private Counter exportJobSuccessCounter; + + @Mock + private Counter exportJobFailureCounter; + + @Mock + private Counter exportS3ObjectsTotalCounter; + @Mock private ExportPartition exportPartition; @@ -73,6 +88,10 @@ class ExportSchedulerTest { @BeforeEach void setUp() { + when(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).thenReturn(exportJobSuccessCounter); + when(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).thenReturn(exportJobFailureCounter); + when(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).thenReturn(exportS3ObjectsTotalCounter); + exportScheduler = createObjectUnderTest(); } @@ -87,7 +106,8 @@ void test_given_no_export_partition_then_not_export() throws InterruptedExceptio Thread.sleep(100); executorService.shutdownNow(); - verifyNoInteractions(snapshotManager, exportTaskManager, s3Client); + verifyNoInteractions(snapshotManager, exportTaskManager, s3Client, exportJobSuccessCounter, + exportJobFailureCounter, exportS3ObjectsTotalCounter); } @Test @@ -123,9 +143,11 @@ void test_given_export_partition_and_export_task_id_then_complete_export() throw any(String.class), any(String.class), any(List.class)); verify(sourceCoordinator).createPartition(any(DataFilePartition.class)); verify(sourceCoordinator).completePartition(exportPartition); + verify(exportJobSuccessCounter).increment(); + verify(exportS3ObjectsTotalCounter).increment(1); + verify(exportJobFailureCounter, never()).increment(); } - @Test void test_given_export_partition_without_export_task_id_then_start_and_complete_export() throws InterruptedException { when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); @@ -184,6 +206,59 @@ void test_given_export_partition_without_export_task_id_then_start_and_complete_ any(String.class), any(String.class), any(List.class)); verify(sourceCoordinator).createPartition(any(DataFilePartition.class)); verify(sourceCoordinator).completePartition(exportPartition); + verify(exportJobSuccessCounter).increment(); + verify(exportS3ObjectsTotalCounter).increment(1); + verify(exportJobFailureCounter, never()).increment(); + } + + @Test + void test_given_export_partition_and_null_export_task_id_then_close_partition_with_error() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); + when(exportPartition.getPartitionKey()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getExportTaskId()).thenReturn(null); + when(exportPartition.getProgressState()).thenReturn(Optional.of(exportProgressState)); + final String dbIdentifier = UUID.randomUUID().toString(); + when(exportPartition.getDbIdentifier()).thenReturn(dbIdentifier); + + // Mock snapshot response + final String snapshotId = UUID.randomUUID().toString(); + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + snapshotId; + final Instant createTime = Instant.now(); + final SnapshotInfo snapshotInfoWhenCreate = new SnapshotInfo( + snapshotId, snapshotArn, createTime, SnapshotStatus.CREATING.getStatusName()); + final SnapshotInfo snapshotInfoWhenComplete = new SnapshotInfo( + snapshotId, snapshotArn, createTime, SnapshotStatus.AVAILABLE.getStatusName()); + when(snapshotManager.createSnapshot(dbIdentifier)).thenReturn(snapshotInfoWhenCreate); + when(snapshotManager.checkSnapshotStatus(snapshotId)).thenReturn(snapshotInfoWhenComplete); + + // Mock export response + when(exportProgressState.getIamRoleArn()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getBucket()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getPrefix()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getKmsKeyId()).thenReturn(UUID.randomUUID().toString()); + when(exportTaskManager.startExportTask(any(String.class), any(String.class), any(String.class), + any(String.class), any(String.class), any(List.class))).thenReturn(null); + + // Act + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(exportScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(ExportPartition.PARTITION_TYPE)); + Thread.sleep(200); + executorService.shutdownNow(); + + // Assert + verify(snapshotManager).createSnapshot(dbIdentifier); + verify(exportTaskManager).startExportTask( + any(String.class), any(String.class), any(String.class), + any(String.class), any(String.class), any(List.class)); + verify(sourceCoordinator).closePartition(exportPartition, DEFAULT_CLOSE_DURATION, DEFAULT_MAX_CLOSE_COUNT); + verify(sourceCoordinator, never()).createPartition(any(DataFilePartition.class)); + verify(sourceCoordinator, never()).completePartition(exportPartition); + + verify(exportJobFailureCounter).increment(); + verify(exportJobSuccessCounter, never()).increment(); + verify(exportS3ObjectsTotalCounter, never()).increment(1); } @Test @@ -193,7 +268,8 @@ void test_shutDown() { final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(exportScheduler); exportScheduler.shutdown(); - verifyNoMoreInteractions(sourceCoordinator, snapshotManager, exportTaskManager, s3Client); + verifyNoMoreInteractions(sourceCoordinator, snapshotManager, exportTaskManager, s3Client, + exportJobSuccessCounter, exportJobFailureCounter, exportS3ObjectsTotalCounter); executorService.shutdownNow(); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 406a89cec9..30f622c5d7 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -14,6 +14,7 @@ import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -33,6 +34,9 @@ class BinlogEventListenerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private RdsSourceConfig sourceConfig; + @Mock + private PluginMetrics pluginMetrics; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private com.github.shyiko.mysql.binlog.event.Event binlogEvent; @@ -87,6 +91,6 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) } private BinlogEventListener createObjectUnderTest() { - return new BinlogEventListener(buffer, sourceConfig); + return new BinlogEventListener(buffer, sourceConfig, pluginMetrics); } } \ No newline at end of file