diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java index 5726468ae6..c666984eda 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java @@ -60,11 +60,22 @@ private void setProgressState(final String resumeToken, final long recordNumber) * @param recordCount The last processed record count */ public void checkpoint(final String resumeToken, final long recordCount) { - LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordCount); + LOG.debug("Checkpoint stream partition for collection {} with record number {}", streamPartition.getCollection(), recordCount); setProgressState(resumeToken, recordCount); enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); } + /** + * This method is to reset checkpoint when change stream is invalid. The current thread will give up partition and new thread + * will take ownership of partition. If change stream is valid then new thread proceeds with processing change stream else the + * process repeats. + */ + public void resetCheckpoint() { + LOG.debug("Resetting checkpoint stream partition for collection {}", streamPartition.getCollection()); + setProgressState(null, 0); + enhancedSourceCoordinator.giveUpPartition(streamPartition); + } + public Optional getGlobalStreamLoadStatus() { final Optional partition = enhancedSourceCoordinator.getPartition(STREAM_PREFIX + streamPartition.getPartitionKey()); if(partition.isPresent()) { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index 1bc678a077..b38e39057e 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -41,6 +42,10 @@ public class StreamWorker { private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class); private static final int DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS = 90_000; private static final String COLLECTION_SPLITTER = "\\."; + private static final Set CRUD_OPERATION_TYPE = Set.of(OperationType.INSERT, OperationType.DELETE, + OperationType.UPDATE, OperationType.REPLACE); + private static final Set STREAM_TERMINATE_OPERATION_TYPE = Set.of(OperationType.INVALIDATE, + OperationType.DROP, OperationType.DROP_DATABASE); static final String SUCCESS_ITEM_COUNTER_NAME = "streamRecordsSuccessTotal"; static final String FAILURE_ITEM_COUNTER_NAME = "streamRecordsFailedTotal"; static final String BYTES_RECEIVED = "bytesReceived"; @@ -210,7 +215,7 @@ record = document.getFullDocument().toJson(writerSettings); } else if(shouldTerminateChangeStream(operationType)){ stop(); partitionCheckpoint.resetCheckpoint(); - LOG.warn("The change stream is invalid due to stream operation type {}. Stopping the change stream.", operationType); + LOG.warn("The change stream is invalid due to stream operation type {}. Stopping the current change stream. New thread should restart the stream.", operationType); } else { LOG.warn("The change stream operation type {} is not handled", operationType); } @@ -222,7 +227,7 @@ record = document.getFullDocument().toJson(writerSettings); failureItemsCounter.increment(records.size()); } } else { - LOG.warn("The change stream cursor didn't return any document. Stopping the change stream."); + LOG.warn("The change stream cursor didn't return any document. Stopping the change stream. New thread should restart the stream."); stop(); partitionCheckpoint.resetCheckpoint(); } @@ -249,16 +254,11 @@ record = document.getFullDocument().toJson(writerSettings); } private boolean isCRUDOperation(final OperationType operationType) { - return OperationType.INSERT == operationType || - OperationType.DELETE == operationType || - OperationType.UPDATE == operationType || - OperationType.REPLACE == operationType; + return CRUD_OPERATION_TYPE.contains(operationType); } private boolean shouldTerminateChangeStream(final OperationType operationType) { - return OperationType.INVALIDATE == operationType || - OperationType.DROP == operationType || - OperationType.DROP_DATABASE == operationType; + return STREAM_TERMINATE_OPERATION_TYPE.contains(operationType); } private void writeToBuffer(final List records, final String checkPointToken, final long recordCount) { diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpointTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpointTest.java index 6aaf3badb7..07649c32ab 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpointTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpointTest.java @@ -34,7 +34,7 @@ public class DataStreamPartitionCheckpointTest { private DataStreamPartitionCheckpoint dataStreamPartitionCheckpoint; @Test - public void checkpoint() { + public void checkpoint_success() { final int recordNumber = new Random().nextInt(); final String checkpointToken = UUID.randomUUID().toString(); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); @@ -46,10 +46,20 @@ public void checkpoint() { } @Test - public void updateStreamPartitionForAcknowledgmentWait() { + public void updateStreamPartitionForAcknowledgmentWait_success() { final int minutes = new Random().nextInt(); final Duration duration = Duration.ofMinutes(minutes); dataStreamPartitionCheckpoint.updateStreamPartitionForAcknowledgmentWait(duration); verify(enhancedSourceCoordinator).saveProgressStateForPartition(streamPartition, duration); } + + @Test + public void resetCheckpoint_success() { + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + dataStreamPartitionCheckpoint.resetCheckpoint(); + verify(enhancedSourceCoordinator).giveUpPartition(streamPartition); + verify(streamProgressState).setResumeToken(null); + verify(streamProgressState).setLoadedRecords(0); + verify(streamProgressState).setLastUpdateTimestamp(anyLong()); + } }