Skip to content

Commit

Permalink
Support default mapping for Mongo/DocumentDB data types
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed May 3, 2024
1 parent 38bae6e commit e0a6b0c
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -49,6 +50,10 @@ public class StreamWorker {
static final String SUCCESS_ITEM_COUNTER_NAME = "streamRecordsSuccessTotal";
static final String FAILURE_ITEM_COUNTER_NAME = "streamRecordsFailedTotal";
static final String BYTES_RECEIVED = "bytesReceived";
private static final String REGEX_PATTERN = "pattern";
private static final String REGEX_OPTIONS = "options";
public static final String MONGO_ID_FIELD_NAME = "_id";
public static final String DEFAULT_ID_MAPPING_FIELD_NAME = "doc_id";
private final RecordBufferWriter recordBufferWriter;
private final PartitionKeyRecordConverter recordConverter;
private final DataStreamPartitionCheckpoint partitionCheckpoint;
Expand All @@ -72,6 +77,20 @@ public class StreamWorker {
private final JsonWriterSettings writerSettings = JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.objectIdConverter((value, writer) -> writer.writeString(value.toHexString()))
.binaryConverter((value, writer) -> writer.writeString(Base64.getEncoder().encodeToString(value.getData())))
.dateTimeConverter((value, writer) -> writer.writeNumber(String.valueOf(value.longValue())))
.decimal128Converter((value, writer) -> writer.writeString(value.bigDecimalValue().toPlainString()))
.doubleConverter((value, writer) -> writer.writeNumber(String.valueOf(value.doubleValue())))
.maxKeyConverter((value, writer) -> writer.writeNull())
.minKeyConverter((value, writer) -> writer.writeNull())
.regularExpressionConverter((value, writer) -> {
writer.writeStartObject();
writer.writeString(REGEX_PATTERN, value.getPattern());
writer.writeString(REGEX_OPTIONS, value.getOptions());
writer.writeEndObject();
})
.timestampConverter((value, writer) -> writer.writeNumber(String.valueOf(value.getValue())))
.undefinedConverter((value, writer) -> writer.writeNull())
.build();

public static StreamWorker create(final RecordBufferWriter recordBufferWriter,
Expand Down Expand Up @@ -201,6 +220,9 @@ record = document.getFullDocument().toJson(writerSettings);
checkPointToken = document.getResumeToken().toJson(writerSettings);
// TODO fix eventVersionNumber
final Event event = recordConverter.convert(record, eventCreationTimeMillis, eventCreationTimeMillis, document.getOperationTypeString());
event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(MONGO_ID_FIELD_NAME, Object.class));
// delete _id
event.delete(MONGO_ID_FIELD_NAME);
records.add(event);
recordCount += 1;

Expand All @@ -221,6 +243,7 @@ record = document.getFullDocument().toJson(writerSettings);
}
} catch(Exception e){
// TODO handle documents with size > 10 MB.
// collection.find(new Document("_id", "key")).first();
// this will only happen if writing to buffer gets interrupted from shutdown,
// otherwise it's infinite backoff and retry
LOG.error("Failed to add records to buffer with error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
Expand All @@ -42,6 +43,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
Expand Down Expand Up @@ -361,7 +363,7 @@ void test_processStream_terminateChangeStreamSuccess() {
when(s3PartitionStatus.getPartitions()).thenReturn(partitions);
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus));
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> future = executorService.submit(() -> {
executorService.submit(() -> {
try (MockedStatic<MongoDBConnection> mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) {
mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class)))
.thenReturn(mongoClient);
Expand All @@ -380,4 +382,152 @@ void test_processStream_terminateChangeStreamSuccess() {
verify(failureItemsCounter, never()).increment();
verify(mockPartitionCheckpoint).resetCheckpoint();
}

@Test
void test_processStream_dataTypeConversionSuccess() {
final String collection = "database.collection";
when(streamProgressState.shouldWaitForExport()).thenReturn(false);
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
when(streamPartition.getCollection()).thenReturn(collection);
MongoClient mongoClient = mock(MongoClient.class);
MongoDatabase mongoDatabase = mock(MongoDatabase.class);
MongoCollection col = mock(MongoCollection.class);
ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class);
MongoCursor cursor = mock(MongoCursor.class);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.watch()).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(1000)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
when(cursor.hasNext()).thenReturn(true, false);
ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class);
var document = "{" +
"\"_id\": {" +
"\"$oid\": \"6634ed693ac62386d57bcaf0\"" +
"}," +
"\"name\": \"Hello User\"," +
"\"doc\": {" +
"\"id\": {" +
"\"key1\": \"value1\"," +
"\"key2\": \"value2\"" +
"}," +
"\"nullField\": null," +
"\"numberField\": 123," +
"\"longValue\": {" +
"\"$numberLong\": \"1234567890123456768\"" +
"}," +
"\"stringField\": \"Hello, Mongo!\"," +
"\"booleanField\": true," +
"\"dateField\": {" +
"\"$date\": \"2024-05-03T13:57:51.155Z\"" +
"}," +
"\"arrayField\": [" +
"\"a\"," +
"\"b\"," +
"\"c\"" +
"]," +
"\"objectField\": {" +
"\"nestedKey\": \"nestedValue\"" +
"}," +
"\"binaryField\": {" +
"\"$binary\": {" +
"\"base64\": \"AQIDBA==\"," +
"\"subType\": \"00\"" +
"}" +
"}," +
"\"objectIdField\": {" +
"\"$oid\": \"6634ed5f3ac62386d57bcaef\"" +
"}," +
"\"timestampField\": {" +
"\"$timestamp\": {" +
"\"t\": 1714744681," +
"\"i\": 29" +
"}" +
"}," +
"\"regexField\": {" +
"\"$regularExpression\": {" +
"\"pattern\": \"pattern\"," +
"\"options\": \"i\"" +
"}" +
"}," +
"\"minKeyField\": {" +
"\"$minKey\": 1" +
"}," +
"\"maxKeyField\": {" +
"\"$maxKey\": 1" +
"}" +
"}," +
"\"price128\": {" +
"\"$numberDecimal\": \"123.45\"" +
"}" +
"}";
Document doc1 = Document.parse(document);
BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123));
when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1);
when(streamDoc1.getOperationType()).thenReturn(OperationType.INSERT);
when(cursor.next())
.thenReturn(streamDoc1);
when(streamDoc1.getFullDocument()).thenReturn(doc1);
final String operationType1 = UUID.randomUUID().toString();
when(streamDoc1.getOperationTypeString()).thenReturn(operationType1);
final BsonTimestamp bsonTimestamp1 = mock(BsonTimestamp.class);
final int timeSecond1 = random.nextInt();
when(bsonTimestamp1.getTime()).thenReturn(timeSecond1);
when(streamDoc1.getClusterTime()).thenReturn(bsonTimestamp1);
S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class);
final List<String> partitions = List.of("first", "second");
when(s3PartitionStatus.getPartitions()).thenReturn(partitions);
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus));
Event event = mock(Event.class);
when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString());
when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), anyString())).thenReturn(event);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
try (MockedStatic<MongoDBConnection> mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) {
mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class)))
.thenReturn(mongoClient);
streamWorker.processStream(streamPartition);
}
});
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> verify(mongoClient).close());
verify(mongoDatabase).getCollection(eq("collection"));
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection);
verify(mockRecordConverter).initializePartitions(partitions);
final String expectedRecord = "{" +
"\"_id\": \"6634ed693ac62386d57bcaf0\", " +
"\"name\": \"Hello User\", " +
"\"doc\": {" +
"\"id\": {\"key1\": \"value1\", " +
"\"key2\": \"value2\"" +
"}, " +
"\"nullField\": null, " +
"\"numberField\": 123, " +
"\"longValue\": 1234567890123456768, " +
"\"stringField\": \"Hello, Mongo!\", " +
"\"booleanField\": true, " +
"\"dateField\": 1714744671155, " +
"\"arrayField\": [\"a\", \"b\", \"c\"], " +
"\"objectField\": {" +
"\"nestedKey\": \"nestedValue\"" +
"}, " +
"\"binaryField\": \"AQIDBA==\", " +
"\"objectIdField\": \"6634ed5f3ac62386d57bcaef\", " +
"\"timestampField\": 7364772325884952605, " +
"\"regexField\": {" +
"\"pattern\": \"pattern\", " +
"\"options\": \"i\"" +
"}, " +
"\"minKeyField\": null, " +
"\"maxKeyField\": null}, " +
"\"price128\": \"123.45\"" +
"}";
verify(mockRecordConverter).convert(eq(expectedRecord), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1));
verify(mockRecordBufferWriter).writeToBuffer(eq(null), any());
verify(successItemsCounter).increment(1);
verify(failureItemsCounter, never()).increment();
verify(mockPartitionCheckpoint).resetCheckpoint();
}
}

0 comments on commit e0a6b0c

Please sign in to comment.