From 91ff22d6da2b14d8a27ade89ee516341181c8bd6 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Tue, 7 May 2024 12:03:33 -0500 Subject: [PATCH] Add metadata for primary key and primary key type and validations for Mongo/DocDb source (#4512) * Add metadata for primary key and primary key type and validations for Mongo/DocDb source Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../transformer/DynamicConfigTransformer.java | 4 +- .../documentdb-function-expected.yaml | 2 +- .../expected/documentdb2-expected.yaml | 2 +- .../plugins/mongo/client/BsonHelper.java | 2 +- .../mongo/configuration/CollectionConfig.java | 2 + .../configuration/MongoDBSourceConfig.java | 11 +++ .../mongo/documentdb/DocumentDBService.java | 6 +- .../mongo/export/ExportPartitionWorker.java | 10 +-- .../plugins/mongo/stream/StreamWorker.java | 6 +- .../export/ExportPartitionWorkerTest.java | 13 +++- .../mongo/stream/StreamWorkerTest.java | 70 +++++++++++-------- 11 files changed, 84 insertions(+), 44 deletions(-) diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index ff953c1cbc..a9e6b50f45 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -354,9 +354,9 @@ private boolean isJsonPath(String parameter) { */ public String calculateDepth(String s3Prefix) { if(s3Prefix == null){ - return Integer.toString(3); + return Integer.toString(4); } - return Integer.toString(s3Prefix.split("/").length + 3); + return Integer.toString(s3Prefix.split("/").length + 4); } public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml index 3506d43f77..22f9bf1d1c 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml @@ -16,4 +16,4 @@ docdb-pipeline-transformed: sink: - opensearch: hosts: "host" - depth: "5" + depth: "6" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml index 3d4d71f4df..4d60ee66a2 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml @@ -15,7 +15,7 @@ documentdb-pipeline-writer: - bucket: name: "bucket-name1" filter: - depth: "5" + depth: "6" scheduling: interval: "1s" acknowledgments: true diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java index cda5523c7e..3f3f656e24 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java @@ -87,7 +87,7 @@ public class BsonHelper { private static final String REGEX_PATTERN = "pattern"; private static final String REGEX_OPTIONS = "options"; public static final String DOCUMENTDB_ID_FIELD_NAME = "_id"; - // public static final String DEFAULT_ID_MAPPING_FIELD_NAME = "doc_id"; + public static final JsonWriterSettings JSON_WRITER_SETTINGS = JsonWriterSettings.builder() .outputMode(JsonMode.RELAXED) .objectIdConverter((value, writer) -> writer.writeString(value.toHexString())) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java index 7814156b00..e1a6a69a70 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Pattern; import java.util.Arrays; import java.util.stream.Collectors; @@ -12,6 +13,7 @@ public class CollectionConfig { private static final int DEFAULT_PARTITION_COUNT = 100; private static final int DEFAULT_EXPORT_BATCH_SIZE = 10_000; @JsonProperty("collection") + @Pattern(regexp = ".+?\\..+", message = "Should be of pattern .") private @NotNull String collection; @JsonProperty("export") diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java index 1313a11d7d..37b7ec6716 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import java.time.Duration; import java.util.ArrayList; @@ -29,6 +30,9 @@ public class MongoDBSourceConfig { @JsonProperty("read_preference") private String readPreference; @JsonProperty("collections") + @Valid + @NotNull + @Size(min = 1) private List collections; @JsonProperty("acknowledgments") private Boolean acknowledgments = false; @@ -60,6 +64,9 @@ public class MongoDBSourceConfig { @JsonProperty("disable_s3_read_for_leader") private boolean disableS3ReadForLeader = false; + @JsonProperty("id_key") + private String idKey; + public MongoDBSourceConfig() { this.readPreference = DEFAULT_READ_PREFERENCE; this.collections = new ArrayList<>(); @@ -133,6 +140,10 @@ public boolean isDisableS3ReadForLeader() { return disableS3ReadForLeader; } + public String getIdKey() { + return this.idKey; + } + public AwsConfig getAwsConfig() { return this.awsConfig; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java index 233c45d10e..0ffba7153d 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -83,10 +84,11 @@ private String getS3PathPrefix() { } final String s3PathPrefix; + final Instant now = Instant.now(); if (sourceCoordinator.getPartitionPrefix() != null ) { - s3PathPrefix = s3UserPathPrefix + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER; + s3PathPrefix = s3UserPathPrefix + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + now.toEpochMilli() + S3_PATH_DELIMITER; } else { - s3PathPrefix = s3UserPathPrefix; + s3PathPrefix = s3UserPathPrefix + now.toEpochMilli() + S3_PATH_DELIMITER; } return s3PathPrefix; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java index 03798c32f6..42fbe63f16 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java @@ -178,12 +178,14 @@ public void run() { recordBytes.add(bytes); bytesReceivedSummary.record(bytes); final Optional primaryKeyDoc = Optional.ofNullable(document.toBsonDocument()); - final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.getBsonType().name()).orElse(UNKNOWN_TYPE); + final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.get(DOCUMENTDB_ID_FIELD_NAME).getBsonType().name()).orElse(UNKNOWN_TYPE); // The version number is the export time minus some overlap to ensure new stream events still get priority final long eventVersionNumber = (exportStartTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis()) * 1_000; final Event event = recordConverter.convert(record, exportStartTime, eventVersionNumber, primaryKeyBsonType); - // event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); + if (sourceConfig.getIdKey() !=null && !sourceConfig.getIdKey().isBlank()) { + event.put(sourceConfig.getIdKey(), event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); + } // delete _id event.delete(DOCUMENTDB_ID_FIELD_NAME); records.add(event); @@ -207,7 +209,7 @@ public void run() { successRecords += 1; } catch (Exception e) { - LOG.error("Failed to add record to buffer with error {}", e.getMessage()); + LOG.error("Failed to add record to buffer with error.", e); failureItemsCounter.increment(records.size()); failedRecords += 1; } @@ -233,7 +235,7 @@ public void run() { } } catch (Exception e) { - LOG.error("Exception connecting to cluster and loading partition {}. Exception: {}", query, e.getMessage()); + LOG.error("Exception connecting to cluster and loading partition {}.", query, e); throw new RuntimeException(e); } finally { // Do final checkpoint when reaching end of partition or due to exception diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index 341f6a7848..e497cafcd8 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -196,11 +196,13 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS); checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS); final Optional primaryKeyDoc = Optional.ofNullable(document.getDocumentKey()); - final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.getBsonType().name()).orElse(UNKNOWN_TYPE); + final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.get(DOCUMENTDB_ID_FIELD_NAME).getBsonType().name()).orElse(UNKNOWN_TYPE); // TODO fix eventVersionNumber final Event event = recordConverter.convert(record, eventCreationTimeMillis, eventCreationTimeMillis, document.getOperationType(), primaryKeyBsonType); - // event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); + if (sourceConfig.getIdKey() !=null && !sourceConfig.getIdKey().isBlank()) { + event.put(sourceConfig.getIdKey(), event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); + } // delete _id event.delete(DOCUMENTDB_ID_FIELD_NAME); records.add(event); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java index ee22c5731e..66c038c0bb 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java @@ -8,10 +8,13 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.bson.BsonDocument; +import org.bson.BsonObjectId; +import org.bson.BsonString; import org.bson.BsonType; import org.bson.Document; import org.bson.conversions.Bson; import org.bson.json.JsonWriterSettings; +import org.bson.types.ObjectId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -51,6 +54,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.DOCUMENTDB_ID_FIELD_NAME; import static org.opensearch.dataprepper.plugins.mongo.export.ExportPartitionWorker.EXPORT_RECORDS_TOTAL_COUNT; import static org.opensearch.dataprepper.plugins.mongo.export.ExportPartitionWorker.BYTES_RECEIVED; import static org.opensearch.dataprepper.plugins.mongo.export.ExportPartitionWorker.BYTES_PROCESSED; @@ -132,10 +136,10 @@ public void testProcessPartitionSuccess(final String partitionKey) { Document doc2 = mock(Document.class); BsonDocument bsonDoc1 = mock(BsonDocument.class); BsonDocument bsonDoc2 = mock(BsonDocument.class); + when(bsonDoc1.get("_id")).thenReturn(new BsonObjectId(new ObjectId())); + when(bsonDoc2.get("_id")).thenReturn(new BsonString(UUID.randomUUID().toString())); when(doc1.toBsonDocument()).thenReturn(bsonDoc1); when(doc2.toBsonDocument()).thenReturn(bsonDoc2); - when(bsonDoc1.getBsonType()).thenReturn(BsonType.OBJECT_ID); - when(bsonDoc2.getBsonType()).thenReturn(BsonType.STRING); final String docJson1 = UUID.randomUUID().toString(); final String docJson2 = UUID.randomUUID() + docJson1; when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(docJson1); @@ -143,9 +147,12 @@ public void testProcessPartitionSuccess(final String partitionKey) { lenient().when(cursor.next()) .thenReturn(doc1) .thenReturn(doc2); + when(mockSourceConfig.getIdKey()).thenReturn("docdb_id"); final long eventVersionNumber = (exportStartTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis()) * 1_000; Event event1 = mock((Event.class)); Event event2 = mock((Event.class)); + when(event1.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); + when(event2.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); when(mockRecordConverter.convert(docJson1, exportStartTime, eventVersionNumber, BsonType.OBJECT_ID.name())).thenReturn(event1); when(mockRecordConverter.convert(docJson2, exportStartTime, eventVersionNumber, BsonType.STRING.name())).thenReturn(event2); lenient().when(dataQueryPartition.getPartitionKey()).thenReturn(partitionKey); @@ -183,6 +190,8 @@ public void testProcessPartitionSuccess(final String partitionKey) { verify(mongoDatabase).getCollection(eq("collection")); verify(mockRecordConverter).initializePartitions(partitions); verify(mockRecordBufferWriter).writeToBuffer(eq(mockAcknowledgementSet), any()); + verify(event1).put(mockSourceConfig.getIdKey(), event1.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); + verify(event2).put(mockSourceConfig.getIdKey(), event2.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); verify(exportRecordTotalCounter, times(2)).increment(); verify(successItemsCounter).increment(2.0); verify(bytesReceivedSummary).record(docJson1.getBytes().length); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 835c49415c..cec4d06825 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -10,12 +10,24 @@ import com.mongodb.client.model.changestream.OperationType; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import org.bson.BsonArray; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; import org.bson.BsonDocument; +import org.bson.BsonDouble; import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonObjectId; +import org.bson.BsonString; import org.bson.BsonTimestamp; import org.bson.BsonType; +import org.bson.BsonUndefined; +import org.bson.BsonValue; import org.bson.Document; import org.bson.json.JsonWriterSettings; +import org.bson.types.Decimal128; +import org.bson.types.ObjectId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -59,6 +71,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.DOCUMENTDB_ID_FIELD_NAME; import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.BYTES_RECEIVED; import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.FAILURE_ITEM_COUNTER_NAME; import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.SUCCESS_ITEM_COUNTER_NAME; @@ -91,7 +104,7 @@ public class StreamWorkerTest { private StreamWorker streamWorker; - private final Random random = new Random(); + private static final Random random = new Random(); @BeforeEach public void setup() { @@ -143,9 +156,9 @@ void test_processStream_success() { .thenReturn(streamDoc2); final String doc1Json1 = UUID.randomUUID().toString(); final String doc1Json2 = UUID.randomUUID().toString(); - when(doc1Key.getBsonType()).thenReturn(BsonType.INT64); + when(doc1Key.get("_id")).thenReturn(new BsonInt64(random.nextLong())); when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(doc1Json1); - when(doc2Key.getBsonType()).thenReturn(BsonType.INT32); + when(doc2Key.get("_id")).thenReturn(new BsonInt32(random.nextInt())); when(doc2Key.toJson(any(JsonWriterSettings.class))).thenReturn(doc1Json2); when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc1.getDocumentKey()).thenReturn(doc1Key); @@ -163,7 +176,8 @@ void test_processStream_success() { when(s3PartitionStatus.getPartitions()).thenReturn(partitions); when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); - //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); + when(mockSourceConfig.getIdKey()).thenReturn("docdb_id"); + when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); final ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> { @@ -182,6 +196,7 @@ void test_processStream_success() { verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(OperationType.INSERT), eq(BsonType.INT64.name())); verify(mockRecordConverter).convert(eq(doc1Json2), eq(timeSecond2 * 1000L), eq(timeSecond2 * 1000L), eq(OperationType.DELETE), eq(BsonType.INT32.name())); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); + verify(event, times(2)).put(mockSourceConfig.getIdKey(), event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); verify(mockPartitionCheckpoint, atLeast(2)).checkpoint("{\"resumeToken2\": 234}", 2); @@ -274,7 +289,6 @@ void test_processStream_checkPointIntervalSuccess() { when(s3PartitionStatus.getPartitions()).thenReturn(partitions); when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); - //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { @@ -368,7 +382,7 @@ void test_processStream_terminateChangeStreamSuccess() { when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(doc1Json1); when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc1.getDocumentKey()).thenReturn(keyDoc1); - when(keyDoc1.getBsonType()).thenReturn(BsonType.BOOLEAN); + when(keyDoc1.get("_id")).thenReturn(new BsonBoolean(random.nextBoolean())); final OperationType operationType1 = OperationType.INSERT; when(streamDoc1.getOperationType()).thenReturn(operationType1); final BsonTimestamp bsonTimestamp1 = mock(BsonTimestamp.class); @@ -380,7 +394,6 @@ void test_processStream_terminateChangeStreamSuccess() { when(s3PartitionStatus.getPartitions()).thenReturn(partitions); when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); - //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { @@ -405,7 +418,7 @@ void test_processStream_terminateChangeStreamSuccess() { @ParameterizedTest @MethodSource("mongoDataTypeProvider") - void test_processStream_dataTypeConversionSuccess(final String actualDocument, final String keyType, final String expectedDocument) { + void test_processStream_dataTypeConversionSuccess(final String actualDocument, final BsonValue bsonValue, final String expectedDocument) { final String collection = "database.collection"; when(streamProgressState.shouldWaitForExport()).thenReturn(false); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); @@ -430,7 +443,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f when(streamDoc1.getOperationType()).thenReturn(OperationType.INSERT); when(cursor.next()) .thenReturn(streamDoc1); - when(key1.getBsonType()).thenReturn(BsonType.valueOf(keyType)); + when(key1.get("_id")).thenReturn(bsonValue); when(streamDoc1.getDocumentKey()).thenReturn(key1); when(streamDoc1.getFullDocument()).thenReturn(doc1); final OperationType operationType1 = OperationType.INSERT; @@ -444,7 +457,6 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f when(s3PartitionStatus.getPartitions()).thenReturn(partitions); when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); - //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { @@ -460,7 +472,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f verify(mongoDatabase).getCollection(eq("collection")); verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); verify(mockRecordConverter).initializePartitions(partitions); - verify(mockRecordConverter).convert(eq(expectedDocument), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1), eq(keyType)); + verify(mockRecordConverter).convert(eq(expectedDocument), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1), eq(bsonValue.getBsonType().name())); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); verify(successItemsCounter).increment(1); verify(failureItemsCounter, never()).increment(); @@ -471,71 +483,71 @@ private static Stream mongoDataTypeProvider() { return Stream.of( Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"name\": \"Hello User\"}", - BsonType.BOOLEAN.name(), - "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"name\": \"Hello User\"}"), + new BsonBoolean(random.nextBoolean()), + "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"name\": \"Hello User\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"nullField\": null}", - BsonType.ARRAY.name(), + new BsonArray(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"nullField\": null}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"numberField\": 123}", - BsonType.DATE_TIME.name(), + new BsonDateTime(Math.abs(random.nextLong())), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"numberField\": 123}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"doubleValue\": 3.14159}", - BsonType.TIMESTAMP.name(), + new BsonTimestamp(Math.abs(random.nextLong())), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"doubleValue\": 3.14159}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"longValue\": { \"$numberLong\": \"1234567890123456768\"}}", - BsonType.OBJECT_ID.name(), + new BsonObjectId(new ObjectId()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"longValue\": 1234567890123456768}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"stringField\": \"Hello, Mongo!\"}", - BsonType.DOCUMENT.name(), + new BsonDocument(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"stringField\": \"Hello, Mongo!\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"booleanField\": true}", - BsonType.DOUBLE.name(), + new BsonDouble(random.nextDouble()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"booleanField\": true}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"dateField\": { \"$date\": \"2024-05-03T13:57:51.155Z\"}}", - BsonType.DECIMAL128.name(), + new BsonDecimal128(new Decimal128(random.nextLong())), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"dateField\": 1714744671155}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"arrayField\": [\"a\",\"b\",\"c\"]}", - BsonType.UNDEFINED.name(), + new BsonUndefined(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"arrayField\": [\"a\", \"b\", \"c\"]}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"objectField\": { \"nestedKey\": \"nestedValue\"}}", - BsonType.BINARY.name(), + new BsonDocument(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"objectField\": {\"nestedKey\": \"nestedValue\"}}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"binaryField\": { \"$binary\": {\"base64\": \"AQIDBA==\", \"subType\": \"00\"}}}", - BsonType.STRING.name(), + new BsonString(UUID.randomUUID().toString()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"binaryField\": \"AQIDBA==\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"objectIdField\": { \"$oid\": \"6634ed693ac62386d57b12d0\" }}", - BsonType.ARRAY.name(), + new BsonString(UUID.randomUUID().toString()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"objectIdField\": \"6634ed693ac62386d57b12d0\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"timestampField\": { \"$timestamp\": {\"t\": 1714744681, \"i\": 29}}}", - BsonType.OBJECT_ID.name(), + new BsonObjectId(new ObjectId()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"timestampField\": 7364772325884952605}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"regexField\": { \"$regularExpression\": {\"pattern\": \"^ABC\", \"options\": \"i\"}}}", - BsonType.OBJECT_ID.name(), + new BsonObjectId(new ObjectId()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"regexField\": {\"pattern\": \"^ABC\", \"options\": \"i\"}}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"minKeyField\": { \"$minKey\": 1}}", - BsonType.OBJECT_ID.name(), + new BsonObjectId(new ObjectId()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"minKeyField\": null}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"maxKeyField\": { \"$maxKey\": 1}}", - BsonType.OBJECT_ID.name(), + new BsonObjectId(new ObjectId()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"maxKeyField\": null}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"bigDecimalField\": { \"$numberDecimal\": \"123456789.0123456789\"}}", - BsonType.OBJECT_ID.name(), + new BsonObjectId(new ObjectId()), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"bigDecimalField\": \"123456789.0123456789\"}") ); }