Skip to content

Commit

Permalink
Unit test update
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Apr 1, 2024
1 parent 9c7e6b8 commit e7d3b94
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ private void setProgressState(final String resumeToken, final long recordNumber)
}

/**
* This method is to do a checkpoint with latest sequence number processed.
* Note that this should be called on a regular basis even there are no changes to sequence number
* This method is to do a checkpoint with latest resume token processed.
* Note that this should be called on a regular basis even there are no changes to resume token
* As the checkpoint will also extend the timeout for the lease
*
* @param resumeToken
* @param recordNumber The last record number
* @param resumeToken checkpoint token to start resuming the stream when MongoDB/DocumentDB cursor is open
* @param recordCount The last processed record count
*/
public void checkpoint(final String resumeToken, final long recordNumber) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordNumber);
setProgressState(resumeToken, recordNumber);
public void checkpoint(final String resumeToken, final long recordCount) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordCount);
setProgressState(resumeToken, recordCount);
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -91,23 +90,23 @@ void test_processStream_success() {
MongoCollection col = mock(MongoCollection.class);
ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class);
MongoCursor cursor = mock(MongoCursor.class);
lenient().when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
lenient().when(mongoDatabase.getCollection(anyString())).thenReturn(col);
lenient().when(col.watch()).thenReturn(changeStreamIterable);
lenient().when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
lenient().when(changeStreamIterable.iterator()).thenReturn(cursor);
lenient().when(cursor.hasNext()).thenReturn(true, true, false);
ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class);
ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
when(cursor.hasNext()).thenReturn(true, true, false);
ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class);
ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class);
Document doc1 = mock(Document.class);
Document doc2 = mock(Document.class);
BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123));
BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234));
when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1);
when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2);
lenient().when(cursor.next())
.thenReturn(streamDoc1)
.thenReturn(streamDoc2);
when(cursor.next())
.thenReturn(streamDoc1)
.thenReturn(streamDoc2);
when(streamDoc1.getFullDocument()).thenReturn(doc1);
when(streamDoc2.getFullDocument()).thenReturn(doc2);

Expand Down Expand Up @@ -142,7 +141,8 @@ void test_processStream_mongoClientFailure() {
}

@Test
void test_processStream_highCheckPointIntervalSuccess() {
void test_processStream_checkPointIntervalSuccess() {
when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false)
when(streamProgressState.shouldWaitForExport()).thenReturn(false);
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
when(streamPartition.getCollection()).thenReturn("database.collection");
Expand All @@ -151,15 +151,15 @@ void test_processStream_highCheckPointIntervalSuccess() {
MongoCollection col = mock(MongoCollection.class);
ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class);
MongoCursor cursor = mock(MongoCursor.class);
lenient().when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
lenient().when(mongoDatabase.getCollection(anyString())).thenReturn(col);
lenient().when(col.watch()).thenReturn(changeStreamIterable);
lenient().when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
lenient().when(changeStreamIterable.iterator()).thenReturn(cursor);
lenient().when(cursor.hasNext()).thenReturn(true, true, true, false);
ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class);
ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class);
ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
when(cursor.hasNext()).thenReturn(true, true, true, false);
ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class);
ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class);
ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class);
Document doc1 = mock(Document.class);
Document doc2 = mock(Document.class);
Document doc3 = mock(Document.class);
Expand All @@ -169,10 +169,10 @@ void test_processStream_highCheckPointIntervalSuccess() {
when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1);
when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2);
when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3);
lenient().when(cursor.next())
.thenReturn(streamDoc1)
.thenReturn(streamDoc2)
.thenReturn(streamDoc3);
when(cursor.next())
.thenReturn(streamDoc1)
.thenReturn(streamDoc2)
.thenReturn(streamDoc3);
when(streamDoc1.getFullDocument()).thenReturn(doc1);
when(streamDoc2.getFullDocument()).thenReturn(doc2);
when(streamDoc3.getFullDocument()).thenReturn(doc3);
Expand Down

0 comments on commit e7d3b94

Please sign in to comment.