From 34080553a81b4928391dca9cb321d160c1f2a10e Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:52:02 -0500 Subject: [PATCH] Initial Mongo/DocumentDB source Configuration and Client Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/client/MongoDBHelper.java | 5 +++-- .../plugins/mongo/configuration/MongoDBSourceConfig.java | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBHelper.java b/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBHelper.java index 1b79b5b7f2..88696f6b52 100644 --- a/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBHelper.java +++ b/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBHelper.java @@ -28,7 +28,7 @@ import static com.mongodb.client.model.Filters.lte; public class MongoDBHelper { - private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=true"; + private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s"; private static final String BINARY_PARTITION_FORMAT = "%s-%s"; private static final String BINARY_PARTITION_SPLITTER = "-"; private static final String TIMESTAMP_PARTITION_FORMAT = "%s-%s"; @@ -62,8 +62,9 @@ private static String getConnectionString(final MongoDBSourceConfig sourceConfig final String ssl = sourceConfig.getSSLEnabled().toString(); final String invalidHostAllowed = sourceConfig.getSSLInvalidHostAllowed().toString(); final String readPreference = sourceConfig.getReadPreference(); + final String directionConnection = sourceConfig.getDirectConnection().toString(); return String.format(MONGO_CONNECTION_STRING_TEMPLATE, username, password, hostname, port, - readPreference, ssl, invalidHostAllowed); + readPreference, ssl, invalidHostAllowed, directionConnection); } public static String getPartitionStringFromMongoDBId(Object id, String className) { diff --git a/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java b/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java index 4635d7703c..31487834c7 100644 --- a/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java +++ b/data-prepper-plugins/mongo-source/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java @@ -22,7 +22,6 @@ public class MongoDBSourceConfig { private String trustStorePassword; @JsonProperty("credentials") private CredentialsConfig credentialsConfig; - @JsonProperty("snapshot_fetch_size") private String snapshotFetchSize; @JsonProperty("read_preference") @@ -35,6 +34,8 @@ public class MongoDBSourceConfig { private Boolean ssl; @JsonProperty("ssl_invalid_host_allowed") private Boolean sslInvalidHostAllowed; + @JsonProperty("directConnection") + private Boolean directConnection; public MongoDBSourceConfig() { this.snapshotFetchSize = DEFAULT_SNAPSHOT_FETCH_SIZE; @@ -72,6 +73,10 @@ public Boolean getSSLInvalidHostAllowed() { return this.sslInvalidHostAllowed; } + public Boolean getDirectConnection() { + return this.directConnection; + } + public List getCollections() { return this.collections; }