Skip to content

Commit

Permalink
Exclude updateDescription field from Mongo/DocDB change stream (opens…
Browse files Browse the repository at this point in the history
…earch-project#4516)

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored May 8, 2024
1 parent ad1ab2c commit f23972c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.OperationType;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class StreamWorker {
static final String FAILURE_ITEM_COUNTER_NAME = "changeEventsProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
private static final long MILLI_SECOND = 1_000_000L;
private static final String UPDATE_DESCRIPTION = "updateDescription";
private final RecordBufferWriter recordBufferWriter;
private final PartitionKeyRecordConverter recordConverter;
private final DataStreamPartitionCheckpoint partitionCheckpoint;
Expand Down Expand Up @@ -124,7 +127,9 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter,
private MongoCursor<ChangeStreamDocument<Document>> getChangeStreamCursor(final MongoCollection<Document> collection,
final String resumeToken
) {
final ChangeStreamIterable<Document> changeStreamIterable = collection.watch().batchSize(streamBatchSize);
final ChangeStreamIterable<Document> changeStreamIterable = collection.watch(
List.of(Aggregates.project(Projections.exclude(UPDATE_DESCRIPTION))))
.batchSize(streamBatchSize);

if (resumeToken == null) {
return changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -135,7 +136,7 @@ void test_processStream_success() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -232,7 +233,7 @@ void test_processStream_checkPointIntervalSuccess() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -323,7 +324,7 @@ void test_processStream_stopWorker() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -361,7 +362,7 @@ void test_processStream_terminateChangeStreamSuccess() {
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down Expand Up @@ -430,7 +431,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(col.watch(anyList())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
Expand Down

0 comments on commit f23972c

Please sign in to comment.