Skip to content

Commit

Permalink
Add rds source metrics (opensearch-project#4769)
Browse files Browse the repository at this point in the history
* Add rds source metrics

Signed-off-by: Hai Yan <[email protected]>

* Remove unused imports

Signed-off-by: Hai Yan <[email protected]>

* Add exportS3ObjectsErrors metric

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
oeyh authored and Krishna Kondaka committed Aug 8, 2024
1 parent ae545d3 commit 567842d
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 41 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ dependencies {
testImplementation project(path: ':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(path: ':data-prepper-test-event')
testImplementation libs.avro.core
testImplementation libs.parquet.hadoop
testImplementation libs.parquet.avro
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void start(Buffer<Record<Event>> buffer) {
exportScheduler = new ExportScheduler(
sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer);
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics);
runnableList.add(exportScheduler);
runnableList.add(dataFileScheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package org.opensearch.dataprepper.plugins.source.rds.export;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -18,12 +21,18 @@
import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DataFileLoader implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);

static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5);
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed";
static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";

private final DataFilePartition dataFilePartition;
private final String bucket;
Expand All @@ -32,37 +41,56 @@ public class DataFileLoader implements Runnable {
private final InputCodec codec;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final ExportRecordConverter recordConverter;
private final Counter exportRecordsTotalCounter;
private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;

private DataFileLoader(final DataFilePartition dataFilePartition,
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter) {
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
this.dataFilePartition = dataFilePartition;
bucket = dataFilePartition.getBucket();
objectKey = dataFilePartition.getKey();
this.objectReader = objectReader;
this.codec = codec;
this.bufferAccumulator = bufferAccumulator;
this.recordConverter = recordConverter;

exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT);
bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
}

public static DataFileLoader create(final DataFilePartition dataFilePartition,
final InputCodec codec,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final S3ObjectReader objectReader,
final ExportRecordConverter recordConverter) {
return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter);
final ExportRecordConverter recordConverter,
final PluginMetrics pluginMetrics) {
return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics);
}

@Override
public void run() {
LOG.info("Start loading s3://{}/{}", bucket, objectKey);

AtomicLong eventCount = new AtomicLong();
try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) {

codec.parse(inputStream, record -> {
try {
exportRecordsTotalCounter.increment();
final Event event = record.getData();
final String string = event.toJsonString();
final long bytes = string.getBytes().length;
bytesReceivedSummary.record(bytes);

DataFileProgressState progressState = dataFilePartition.getProgressState().get();

// TODO: primary key to be obtained by querying database schema
Expand All @@ -79,6 +107,8 @@ public void run() {
snapshotTime,
eventVersionNumber));
bufferAccumulator.add(transformedRecord);
eventCount.getAndIncrement();
bytesProcessedSummary.record(bytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -92,8 +122,10 @@ public void run() {

try {
bufferAccumulator.flush();
exportRecordSuccessCounter.increment(eventCount.get());
} catch (Exception e) {
LOG.error("Failed to write events to buffer", e);
exportRecordErrorCounter.increment(eventCount.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.dataprepper.plugins.source.rds.export;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -48,6 +50,9 @@ public class DataFileScheduler implements Runnable {

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed";
static final String EXPORT_S3_OBJECTS_ERROR_COUNT = "exportS3ObjectsErrors";
static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers";


private final EnhancedSourceCoordinator sourceCoordinator;
Expand All @@ -57,21 +62,33 @@ public class DataFileScheduler implements Runnable {
private final InputCodec codec;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final ExportRecordConverter recordConverter;
private final PluginMetrics pluginMetrics;

private final Counter exportFileSuccessCounter;
private final Counter exportFileErrorCounter;
private final AtomicInteger activeExportS3ObjectConsumersGauge;

private volatile boolean shutdownRequested = false;

public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
final S3Client s3Client,
final EventFactory eventFactory,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final PluginMetrics pluginMetrics) {
this.sourceCoordinator = sourceCoordinator;
this.sourceConfig = sourceConfig;
codec = new ParquetInputCodec(eventFactory);
bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
objectReader = new S3ObjectReader(s3Client);
recordConverter = new ExportRecordConverter();
executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT);
this.pluginMetrics = pluginMetrics;

this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT);
this.exportFileErrorCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_ERROR_COUNT);
this.activeExportS3ObjectConsumersGauge = pluginMetrics.gauge(
ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, numOfWorkers, AtomicInteger::get);
}

@Override
Expand Down Expand Up @@ -116,15 +133,18 @@ public void shutdown() {
}

private void processDataFilePartition(DataFilePartition dataFilePartition) {
Runnable loader = DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter);
Runnable loader = DataFileLoader.create(
dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics);
CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor);

runLoader.whenComplete((v, ex) -> {
if (ex == null) {
exportFileSuccessCounter.increment();
// Update global state so we know if all s3 files have been loaded
updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT);
sourceCoordinator.completePartition(dataFilePartition);
} else {
exportFileErrorCounter.increment();
LOG.error("There was an exception while processing an S3 data file", ex);
sourceCoordinator.giveUpPartition(dataFilePartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.rds.export;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
Expand Down Expand Up @@ -42,12 +43,15 @@ public class ExportScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class);

private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
private static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10);
private static final int DEFAULT_MAX_CLOSE_COUNT = 36;
static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10);
static final int DEFAULT_MAX_CLOSE_COUNT = 36;
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000;
private static final int DEFAULT_CHECK_STATUS_INTERVAL_MILLS = 30 * 1000;
private static final Duration DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT = Duration.ofMinutes(60);
static final String PARQUET_SUFFIX = ".parquet";
static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess";
static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure";
static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal";

private final S3Client s3Client;
private final PluginMetrics pluginMetrics;
Expand All @@ -56,6 +60,10 @@ public class ExportScheduler implements Runnable {
private final ExportTaskManager exportTaskManager;
private final SnapshotManager snapshotManager;

private final Counter exportJobSuccessCounter;
private final Counter exportJobFailureCounter;
private final Counter exportS3ObjectsTotalCounter;

private volatile boolean shutdownRequested = false;

public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator,
Expand All @@ -69,6 +77,10 @@ public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator,
this.executor = Executors.newCachedThreadPool();
this.snapshotManager = snapshotManager;
this.exportTaskManager = exportTaskManager;

exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT);
exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT);
exportS3ObjectsTotalCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT);
}

@Override
Expand Down Expand Up @@ -133,17 +145,15 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) {
progressState.setSnapshotId(snapshotInfo.getSnapshotId());
sourceCoordinator.saveProgressStateForPartition(exportPartition, null);
} else {
LOG.error("The snapshot failed to create, it will be retried");
closeExportPartitionWithError(exportPartition);
LOG.error("The snapshot failed to create. The export will be retried");
return null;
}

final String snapshotId = snapshotInfo.getSnapshotId();
try {
snapshotInfo = checkSnapshotStatus(snapshotId, DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT);
} catch (Exception e) {
LOG.warn("Check snapshot status for {} failed", snapshotId, e);
sourceCoordinator.giveUpPartition(exportPartition);
LOG.warn("Check snapshot status for {} failed. The export will be retried", snapshotId, e);
return null;
}
progressState.setSnapshotTime(snapshotInfo.getCreateTime().toEpochMilli());
Expand All @@ -159,14 +169,14 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) {
sourceCoordinator.saveProgressStateForPartition(exportPartition, null);
} else {
LOG.error("The export task failed to create, it will be retried");
closeExportPartitionWithError(exportPartition);
return null;
}

return exportTaskId;
}

private void closeExportPartitionWithError(ExportPartition exportPartition) {
exportJobFailureCounter.increment();
ExportProgressState exportProgressState = exportPartition.getProgressState().get();
// Clear current task id, so that a new export can be submitted.
exportProgressState.setExportTaskId(null);
Expand Down Expand Up @@ -309,12 +319,15 @@ private void createDataFilePartitions(String bucket, String exportTaskId, List<S
totalFiles.getAndIncrement();
}

exportS3ObjectsTotalCounter.increment(totalFiles.get());

// Create a global state to track overall progress for data file processing
LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0);
sourceCoordinator.createPartition(new GlobalState(exportTaskId, loadStatus.toMap()));
}

private void completeExportPartition(ExportPartition exportPartition) {
exportJobSuccessCounter.increment();
ExportProgressState progressState = exportPartition.getProgressState().get();
progressState.setStatus("Completed");
sourceCoordinator.completePartition(exportPartition);
Expand Down
Loading

0 comments on commit 567842d

Please sign in to comment.