Skip to content

Commit

Permalink
Add AcknowledgementSet support to DocumentDB/MongoDB streams (opensea…
Browse files Browse the repository at this point in the history
…rch-project#4379)

* Add AcknowledgementSet support to DocumentDB/MongoDB streams

Signed-off-by: Dinu John <[email protected]>

* Update StreamAcknowledgementManagerTest

Signed-off-by: Dinu John <[email protected]>

* Add BackgroundThreadFactory that adds thread name prefix for debugging

Signed-off-by: Dinu John <[email protected]>

* Update unit test

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Apr 2, 2024
1 parent 1a7e099 commit b7c63bc
Show file tree
Hide file tree
Showing 15 changed files with 596 additions and 94 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/mongodb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ dependencies {
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:http-common')
implementation project(path: ':data-prepper-plugins:common')


testImplementation testLibs.mockito.inline
testImplementation testLibs.bundles.junit
testImplementation testLibs.slf4j.simple
testImplementation project(path: ':data-prepper-test-common')

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public ExportConfig getExportConfig() {
return this.exportConfig;
}

public boolean isExportRequired() {
return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
this.ingestionMode == CollectionConfig.IngestionMode.EXPORT;
}

public boolean isStreamRequired() {
return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
this.ingestionMode == CollectionConfig.IngestionMode.STREAM;
}

public static class ExportConfig {
private static final int DEFAULT_ITEMS_PER_PARTITION = 4000;
@JsonProperty("items_per_partition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,12 +76,7 @@ public Event convert(final String record,
}
final EventMetadata eventMetadata = event.getMetadata();

if (dataType.equals(ExportPartition.PARTITION_TYPE)) {
eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, ExportPartition.PARTITION_TYPE);
} else if (dataType.equals(StreamPartition.PARTITION_TYPE)) {
eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, StreamPartition.PARTITION_TYPE);
}

eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, dataType);
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_COLLECTION_METADATA_ATTRIBUTE, collectionConfig.getCollection());
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.opensearch.dataprepper.plugins.mongo.documentdb;

import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
Expand All @@ -15,6 +17,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -24,11 +28,7 @@ public class DocumentDBService {
private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private final ExecutorService executor;
private ExportScheduler exportScheduler;
private ExportWorker exportWorker;
private LeaderScheduler leaderScheduler;
private StreamScheduler streamScheduler;
private ExecutorService executor;
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
final MongoDBSourceConfig sourceConfig,
Expand All @@ -38,9 +38,7 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;

this.mongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(sourceConfig);
executor = Executors.newFixedThreadPool(4);
}

/**
Expand All @@ -51,23 +49,35 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {
this.exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
this.exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
this.leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
this.streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
final List<Runnable> runnableList = new ArrayList<>();

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
runnableList.add(leaderScheduler);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExportRequired)) {
final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
runnableList.add(exportScheduler);
runnableList.add(exportWorker);
}

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStreamRequired)) {
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
runnableList.add(streamScheduler);
}

executor.submit(leaderScheduler);
executor.submit(exportScheduler);
executor.submit(exportWorker);
executor.submit(streamScheduler);
executor = Executors.newFixedThreadPool(runnableList.size(), BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source"));
runnableList.forEach(executor::submit);
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
if (executor != null) {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,7 @@ public void run() {
}
}

private boolean isExportRequired(final CollectionConfig.IngestionMode ingestionMode) {
return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
ingestionMode == CollectionConfig.IngestionMode.EXPORT;
}

private boolean isStreamRequired(final CollectionConfig.IngestionMode ingestionMode) {
return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
ingestionMode == CollectionConfig.IngestionMode.STREAM;
}
private void init() {
LOG.info("Try to initialize DocumentDB Leader Partition");

Expand All @@ -120,13 +112,13 @@ private void init() {
coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null));

final Instant startTime = Instant.now();
final boolean exportRequired = isExportRequired(collectionConfig.getIngestionMode());
final boolean exportRequired = collectionConfig.isExportRequired();
LOG.info("Ingestion mode {} for Collection {}", collectionConfig.getIngestionMode(), collectionConfig.getCollection());
if (exportRequired) {
createExportPartition(collectionConfig, startTime);
}

if (isStreamRequired(collectionConfig.getIngestionMode())) {
if (collectionConfig.isStreamRequired()) {
createStreamPartition(collectionConfig, startTime, exportRequired);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.dataprepper.plugins.mongo.model;

public class CheckpointStatus {
private final String resumeToken;
private final long recordCount;
private boolean acknowledged;
private final long createTimestamp;
private Long acknowledgedTimestamp;

public CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) {
this.resumeToken = resumeToken;
this.recordCount = recordCount;
this.acknowledged = false;
this.createTimestamp = createTimestamp;
}

public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) {
this.acknowledgedTimestamp = acknowledgedTimestamp;
}

public void setAcknowledged(boolean acknowledged) {
this.acknowledged = acknowledged;
}

public String getResumeToken() {
return resumeToken;
}
public long getRecordCount() {
return recordCount;
}

public boolean isAcknowledged() {
return acknowledged;
}

public long getCreateTimestamp() {
return createTimestamp;
}

public long getAcknowledgedTimestamp() {
return acknowledgedTimestamp;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ private void setProgressState(final String resumeToken, final long recordNumber)
}

/**
* This method is to do a checkpoint with latest sequence number processed.
* Note that this should be called on a regular basis even there are no changes to sequence number
* This method is to do a checkpoint with latest resume token processed.
* Note that this should be called on a regular basis even there are no changes to resume token
* As the checkpoint will also extend the timeout for the lease
*
* @param resumeToken
* @param recordNumber The last record number
* @param resumeToken checkpoint token to start resuming the stream when MongoDB/DocumentDB cursor is open
* @param recordCount The last processed record count
*/
public void checkpoint(final String resumeToken, final long recordNumber) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordNumber);
setProgressState(resumeToken, recordNumber);
public void checkpoint(final String resumeToken, final long recordCount) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordCount);
setProgressState(resumeToken, recordCount);
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

Expand All @@ -76,4 +76,8 @@ public Optional<StreamLoadStatus> getGlobalStreamLoadStatus() {
public void updateStreamPartitionForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) {
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout);
}

public void giveUpPartition() {
enhancedSourceCoordinator.giveUpPartition(streamPartition);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.opensearch.dataprepper.plugins.mongo.stream;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

public class StreamAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
private final ConcurrentLinkedQueue<CheckpointStatus> checkpoints = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, CheckpointStatus> ackStatus = new ConcurrentHashMap<>();
private final AcknowledgementSetManager acknowledgementSetManager;
private final DataStreamPartitionCheckpoint partitionCheckpoint;

private final Duration partitionAcknowledgmentTimeout;
private final int acknowledgementMonitorWaitTimeInMs;
private final int checkPointIntervalInMs;
private final ExecutorService executorService;

private boolean enableAcknowledgement = false;

public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final DataStreamPartitionCheckpoint partitionCheckpoint,
final Duration partitionAcknowledgmentTimeout,
final int acknowledgementMonitorWaitTimeInMs,
final int checkPointIntervalInMs) {
this.acknowledgementSetManager = acknowledgementSetManager;
this.partitionCheckpoint = partitionCheckpoint;
this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout;
this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs;
this.checkPointIntervalInMs = checkPointIntervalInMs;
executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor"));
}

void init(final Consumer<Void> stopWorkerConsumer) {
enableAcknowledgement = true;
executorService.submit(() -> monitorAcknowledgment(executorService, stopWorkerConsumer));
}

private void monitorAcknowledgment(final ExecutorService executorService, final Consumer<Void> stopWorkerConsumer) {
long lastCheckpointTime = System.currentTimeMillis();
CheckpointStatus lastCheckpointStatus = null;
while (!Thread.currentThread().isInterrupted()) {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
break;
}
}
}

try {
Thread.sleep(acknowledgementMonitorWaitTimeInMs);
} catch (InterruptedException ex) {
break;
}
}
stopWorkerConsumer.accept(null);
executorService.shutdown();
}

Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) {
if (!enableAcknowledgement) {
return Optional.empty();
}

final CheckpointStatus checkpointStatus = new CheckpointStatus(resumeToken, recordNumber, Instant.now().toEpochMilli());
checkpoints.add(checkpointStatus);
ackStatus.put(resumeToken, checkpointStatus);
return Optional.of(acknowledgementSetManager.create((result) -> {
if (result) {
final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken);
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
ackCheckpointStatus.setAcknowledged(true);
LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken);
} else {
LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken);
// default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out
// and reprocess stream from last successful checkpoint in the order.
}
}, partitionAcknowledgmentTimeout));
}

void shutdown() {
executorService.shutdown();
}

@VisibleForTesting
ConcurrentHashMap<String, CheckpointStatus> getAcknowledgementStatus() {
return ackStatus;
}

@VisibleForTesting
ConcurrentLinkedQueue<CheckpointStatus> getCheckpoints() {
return checkpoints;
}
}
Loading

0 comments on commit b7c63bc

Please sign in to comment.