Skip to content

Commit

Permalink
Exclude updateDescription field from Mongo/DocDB change stream
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed May 8, 2024
1 parent ad1ab2c commit 6d12038
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.OperationType;
Expand Down Expand Up @@ -124,7 +126,9 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter,
private MongoCursor<ChangeStreamDocument<Document>> getChangeStreamCursor(final MongoCollection<Document> collection,
final String resumeToken
) {
final ChangeStreamIterable<Document> changeStreamIterable = collection.watch().batchSize(streamBatchSize);
final ChangeStreamIterable<Document> changeStreamIterable = collection.watch(
List.of(Aggregates.project(Projections.exclude("updateDescription"))))
.batchSize(streamBatchSize);

if (resumeToken == null) {
return changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -135,7 +136,7 @@ void test_processStream_success() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -232,7 +233,7 @@ void test_processStream_checkPointIntervalSuccess() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -323,7 +324,7 @@ void test_processStream_stopWorker() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -361,7 +362,7 @@ void test_processStream_terminateChangeStreamSuccess() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -430,7 +431,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down

0 comments on commit 6d12038

Please sign in to comment.