diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index 610c89d67b..3acfe2ee35 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -5,6 +5,8 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.OperationType; @@ -124,7 +126,9 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter, private MongoCursor> getChangeStreamCursor(final MongoCollection collection, final String resumeToken ) { - final ChangeStreamIterable changeStreamIterable = collection.watch().batchSize(streamBatchSize); + final ChangeStreamIterable changeStreamIterable = collection.watch( + List.of(Aggregates.project(Projections.exclude("updateDescription")))) + .batchSize(streamBatchSize); if (resumeToken == null) { return changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 7c37cb48c5..daf4e8f22d 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -60,6 +60,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -135,7 +136,7 @@ void test_processStream_success() { MongoCursor cursor = mock(MongoCursor.class); when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); + when(col.watch(anyList())).thenReturn(changeStreamIterable); when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); when(changeStreamIterable.iterator()).thenReturn(cursor); @@ -232,7 +233,7 @@ void test_processStream_checkPointIntervalSuccess() { MongoCursor cursor = mock(MongoCursor.class); when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); + when(col.watch(anyList())).thenReturn(changeStreamIterable); when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); when(changeStreamIterable.iterator()).thenReturn(cursor); @@ -323,7 +324,7 @@ void test_processStream_stopWorker() { MongoCursor cursor = mock(MongoCursor.class); when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); + when(col.watch(anyList())).thenReturn(changeStreamIterable); when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); when(changeStreamIterable.iterator()).thenReturn(cursor); @@ -361,7 +362,7 @@ void test_processStream_terminateChangeStreamSuccess() { MongoCursor cursor = mock(MongoCursor.class); when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); + when(col.watch(anyList())).thenReturn(changeStreamIterable); when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); when(changeStreamIterable.iterator()).thenReturn(cursor); @@ -430,7 +431,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f MongoCursor cursor = mock(MongoCursor.class); when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); + when(col.watch(anyList())).thenReturn(changeStreamIterable); when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); when(changeStreamIterable.iterator()).thenReturn(cursor);