Skip to content

Commit

Permalink
Add reset checkpoint method to mongo/docdb partition checkpoint class
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed May 2, 2024
1 parent a805440 commit 67d3e47
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,22 @@ private void setProgressState(final String resumeToken, final long recordNumber)
* @param recordCount The last processed record count
*/
public void checkpoint(final String resumeToken, final long recordCount) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordCount);
LOG.debug("Checkpoint stream partition for collection {} with record number {}", streamPartition.getCollection(), recordCount);
setProgressState(resumeToken, recordCount);
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

/**
* This method is to reset checkpoint when change stream is invalid. The current thread will give up partition and new thread
* will take ownership of partition. If change stream is valid then new thread proceeds with processing change stream else the
* process repeats.
*/
public void resetCheckpoint() {
LOG.debug("Resetting checkpoint stream partition for collection {}", streamPartition.getCollection());
setProgressState(null, 0);
enhancedSourceCoordinator.giveUpPartition(streamPartition);
}

public Optional<StreamLoadStatus> getGlobalStreamLoadStatus() {
final Optional<EnhancedSourcePartition> partition = enhancedSourceCoordinator.getPartition(STREAM_PREFIX + streamPartition.getPartitionKey());
if(partition.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,6 +42,10 @@ public class StreamWorker {
private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class);
private static final int DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS = 90_000;
private static final String COLLECTION_SPLITTER = "\\.";
private static final Set<OperationType> CRUD_OPERATION_TYPE = Set.of(OperationType.INSERT, OperationType.DELETE,
OperationType.UPDATE, OperationType.REPLACE);
private static final Set<OperationType> STREAM_TERMINATE_OPERATION_TYPE = Set.of(OperationType.INVALIDATE,
OperationType.DROP, OperationType.DROP_DATABASE);
static final String SUCCESS_ITEM_COUNTER_NAME = "streamRecordsSuccessTotal";
static final String FAILURE_ITEM_COUNTER_NAME = "streamRecordsFailedTotal";
static final String BYTES_RECEIVED = "bytesReceived";
Expand Down Expand Up @@ -210,7 +215,7 @@ record = document.getFullDocument().toJson(writerSettings);
} else if(shouldTerminateChangeStream(operationType)){
stop();
partitionCheckpoint.resetCheckpoint();
LOG.warn("The change stream is invalid due to stream operation type {}. Stopping the change stream.", operationType);
LOG.warn("The change stream is invalid due to stream operation type {}. Stopping the current change stream. New thread should restart the stream.", operationType);
} else {
LOG.warn("The change stream operation type {} is not handled", operationType);
}
Expand All @@ -222,7 +227,7 @@ record = document.getFullDocument().toJson(writerSettings);
failureItemsCounter.increment(records.size());
}
} else {
LOG.warn("The change stream cursor didn't return any document. Stopping the change stream.");
LOG.warn("The change stream cursor didn't return any document. Stopping the change stream. New thread should restart the stream.");
stop();
partitionCheckpoint.resetCheckpoint();
}
Expand All @@ -249,16 +254,11 @@ record = document.getFullDocument().toJson(writerSettings);
}

private boolean isCRUDOperation(final OperationType operationType) {
return OperationType.INSERT == operationType ||
OperationType.DELETE == operationType ||
OperationType.UPDATE == operationType ||
OperationType.REPLACE == operationType;
return CRUD_OPERATION_TYPE.contains(operationType);
}

private boolean shouldTerminateChangeStream(final OperationType operationType) {
return OperationType.INVALIDATE == operationType ||
OperationType.DROP == operationType ||
OperationType.DROP_DATABASE == operationType;
return STREAM_TERMINATE_OPERATION_TYPE.contains(operationType);
}

private void writeToBuffer(final List<Event> records, final String checkPointToken, final long recordCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class DataStreamPartitionCheckpointTest {
private DataStreamPartitionCheckpoint dataStreamPartitionCheckpoint;

@Test
public void checkpoint() {
public void checkpoint_success() {
final int recordNumber = new Random().nextInt();
final String checkpointToken = UUID.randomUUID().toString();
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
Expand All @@ -46,10 +46,20 @@ public void checkpoint() {
}

@Test
public void updateStreamPartitionForAcknowledgmentWait() {
public void updateStreamPartitionForAcknowledgmentWait_success() {
final int minutes = new Random().nextInt();
final Duration duration = Duration.ofMinutes(minutes);
dataStreamPartitionCheckpoint.updateStreamPartitionForAcknowledgmentWait(duration);
verify(enhancedSourceCoordinator).saveProgressStateForPartition(streamPartition, duration);
}

@Test
public void resetCheckpoint_success() {
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
dataStreamPartitionCheckpoint.resetCheckpoint();
verify(enhancedSourceCoordinator).giveUpPartition(streamPartition);
verify(streamProgressState).setResumeToken(null);
verify(streamProgressState).setLoadedRecords(0);
verify(streamProgressState).setLastUpdateTimestamp(anyLong());
}
}

0 comments on commit 67d3e47

Please sign in to comment.