Skip to content

Commit

Permalink
Renamed Configuration parameters and refactored the MongoDBHelper class
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Mar 12, 2024
1 parent 3e9b0e8 commit f2573c9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@

package org.opensearch.dataprepper.plugins.mongo.client;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.BSONTimestamp;
Expand All @@ -17,56 +13,17 @@
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.bson.types.Symbol;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider;

import java.io.File;
import java.util.Objects;

import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.gte;
import static com.mongodb.client.model.Filters.lte;

public class MongoDBHelper {
private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s";
public class BsonHelper {
private static final String BINARY_PARTITION_FORMAT = "%s-%s";
private static final String BINARY_PARTITION_SPLITTER = "-";
private static final String TIMESTAMP_PARTITION_FORMAT = "%s-%s";
private static final String TIMESTAMP_PARTITION_SPLITTER = "-";

public static MongoClient getMongoClient(final MongoDBSourceConfig sourceConfig) {

final String connectionString = getConnectionString(sourceConfig);

final MongoClientSettings.Builder settingBuilder = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString));

if (Objects.nonNull(sourceConfig.getTrustStoreFilePath())) {
final File truststoreFilePath = new File(sourceConfig.getTrustStoreFilePath());
settingBuilder.applyToSslSettings(builder -> {
builder.enabled(sourceConfig.getSSLEnabled());
builder.invalidHostNameAllowed(sourceConfig.getSSLInvalidHostAllowed());
builder.context(TrustStoreProvider.createSSLContext(truststoreFilePath.toPath(),
sourceConfig.getTrustStorePassword()));
});
}

return MongoClients.create(settingBuilder.build());
}

private static String getConnectionString(final MongoDBSourceConfig sourceConfig) {
final String username = sourceConfig.getCredentialsConfig().getUserName();
final String password = sourceConfig.getCredentialsConfig().getPassword();
final String hostname = sourceConfig.getHostname();
final int port = sourceConfig.getPort();
final String ssl = sourceConfig.getSSLEnabled().toString();
final String invalidHostAllowed = sourceConfig.getSSLInvalidHostAllowed().toString();
final String readPreference = sourceConfig.getReadPreference();
final String directionConnection = sourceConfig.getDirectConnection().toString();
return String.format(MONGO_CONNECTION_STRING_TEMPLATE, username, password, hostname, port,
readPreference, ssl, invalidHostAllowed, directionConnection);
}

public static String getPartitionStringFromMongoDBId(Object id, String className) {
switch (className) {
case "org.bson.Document":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

public class MongoDBSourceConfig {
private static final int DEFAULT_PORT = 27017;
private static final Boolean SSL_ENABLED = false;
private static final Boolean SSL_INVALID_HOST_ALLOWED = false;
private static final Boolean DEFAULT_INSECURE = false;
private static final Boolean DEFAULT_INSECURE_DISABLE_VERIFICATION = false;
private static final String DEFAULT_SNAPSHOT_FETCH_SIZE = "1000";
private static final String DEFAULT_READ_PREFERENCE = "secondaryPreferred";
@JsonProperty("hostname")
Expand All @@ -31,19 +31,19 @@ public class MongoDBSourceConfig {
private List<CollectionConfig> collections;
@JsonProperty("acknowledgments")
private Boolean acknowledgments = false;
@JsonProperty("ssl")
private Boolean ssl;
@JsonProperty("ssl_invalid_host_allowed")
private Boolean sslInvalidHostAllowed;
@JsonProperty("insecure")
private Boolean insecure;
@JsonProperty("ssl_insecure_disable_verification")
private Boolean sslInsecureDisableVerification;
@JsonProperty("direct_connection")
private Boolean directConnection;

public MongoDBSourceConfig() {
this.snapshotFetchSize = DEFAULT_SNAPSHOT_FETCH_SIZE;
this.readPreference = DEFAULT_READ_PREFERENCE;
this.collections = new ArrayList<>();
this.ssl = SSL_ENABLED;
this.sslInvalidHostAllowed = SSL_INVALID_HOST_ALLOWED;
this.insecure = DEFAULT_INSECURE;
this.sslInsecureDisableVerification = DEFAULT_INSECURE_DISABLE_VERIFICATION;
}

public CredentialsConfig getCredentialsConfig() {
Expand All @@ -66,12 +66,12 @@ public String getTrustStorePassword() {
return this.trustStorePassword;
}

public Boolean getSSLEnabled() {
return this.ssl;
public Boolean getInsecure() {
return this.insecure;
}

public Boolean getSSLInvalidHostAllowed() {
return this.sslInvalidHostAllowed;
public Boolean getSslInsecureDisableVerification() {
return this.sslInsecureDisableVerification;
}

public Boolean getDirectConnection() {
Expand All @@ -91,14 +91,14 @@ public boolean isAcknowledgmentsEnabled() {
}

public static class CredentialsConfig {
@JsonProperty("user_name")
private String userName;
@JsonProperty("username")
private String username;

@JsonProperty("password")
private String password;

public String getUserName() {
return userName;
public String getUsername() {
return username;
}

public String getPassword() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.dataprepper.plugins.mongo.client;

import com.mongodb.client.MongoClient;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.BSONTimestamp;
Expand All @@ -10,77 +9,27 @@
import org.bson.types.ObjectId;
import org.bson.types.Symbol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider;

import javax.net.ssl.SSLContext;
import java.nio.file.Path;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class MongoDBHelperTest {
@Mock
private MongoDBSourceConfig mongoDBSourceConfig;

@Mock
private MongoDBSourceConfig.CredentialsConfig credentialsConfig;

public class BsonHelperTest {
private final Random random = new Random();

@BeforeEach
void setUp() {

lenient().when(mongoDBSourceConfig.getCredentialsConfig()).thenReturn(credentialsConfig);
lenient().when(credentialsConfig.getUserName()).thenReturn(UUID.randomUUID().toString());
lenient().when(credentialsConfig.getPassword()).thenReturn(UUID.randomUUID().toString());
lenient().when(mongoDBSourceConfig.getHostname()).thenReturn(UUID.randomUUID().toString());
lenient().when(mongoDBSourceConfig.getPort()).thenReturn(getRandomInteger());
lenient().when(mongoDBSourceConfig.getSSLEnabled()).thenReturn(getRandomBoolean());
lenient().when(mongoDBSourceConfig.getSSLInvalidHostAllowed()).thenReturn(getRandomBoolean());
lenient().when(mongoDBSourceConfig.getReadPreference()).thenReturn("secondaryPreferred");
}

@Test
public void getMongoClient() {
final MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBSourceConfig);
assertThat(mongoClient, is(notNullValue()));
}

@Test
public void getMongoClientWithTLS() {
when(mongoDBSourceConfig.getTrustStoreFilePath()).thenReturn(UUID.randomUUID().toString());
when(mongoDBSourceConfig.getTrustStorePassword()).thenReturn(UUID.randomUUID().toString());
final Path path = mock(Path.class);
final SSLContext sslContext = mock(SSLContext.class);
try (MockedStatic<TrustStoreProvider> trustStoreProviderMockedStatic = mockStatic(TrustStoreProvider.class)) {
trustStoreProviderMockedStatic.when(() -> TrustStoreProvider.createSSLContext(path,
UUID.randomUUID().toString()))
.thenReturn(sslContext);
final MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBSourceConfig);
assertThat(mongoClient, is(notNullValue()));
}
}

@Test
public void getDocumentPartitionStringFromMongoDBId() {
final Document document = mock(Document.class);
when(document.toJson()).thenReturn(UUID.randomUUID().toString());
final String partition = MongoDBHelper.getPartitionStringFromMongoDBId(document, Document.class.getName());
final String partition = BsonHelper.getPartitionStringFromMongoDBId(document, Document.class.getName());
assertThat(partition, is(document.toJson()));
}

Expand All @@ -91,7 +40,7 @@ public void getBinaryPartitionStringFromMongoDBId() {
final byte[] byteData = new byte[] { getRandomByte() };
when(document.getType()).thenReturn(type);
when(document.getData()).thenReturn(byteData);
final String partition = MongoDBHelper.getPartitionStringFromMongoDBId(document, Binary.class.getName());
final String partition = BsonHelper.getPartitionStringFromMongoDBId(document, Binary.class.getName());
assertThat(partition, is(String.format("%s-%s", type, new String(byteData))));
}

Expand All @@ -100,23 +49,23 @@ public void getBSONTimestampPartitionStringFromMongoDBId() {
final BSONTimestamp document = mock(BSONTimestamp.class);
when(document.getInc()).thenReturn(getRandomInteger());
when(document.getTime()).thenReturn(getRandomInteger());
final String partition = MongoDBHelper.getPartitionStringFromMongoDBId(document, BSONTimestamp.class.getName());
final String partition = BsonHelper.getPartitionStringFromMongoDBId(document, BSONTimestamp.class.getName());
assertThat(partition, is(String.format("%s-%s", document.getInc(), document.getTime())));
}

@Test
public void getCodePartitionStringFromMongoDBId() {
final Code document = mock(Code.class);
when(document.getCode()).thenReturn(UUID.randomUUID().toString());
final String partition = MongoDBHelper.getPartitionStringFromMongoDBId(document, Code.class.getName());
final String partition = BsonHelper.getPartitionStringFromMongoDBId(document, Code.class.getName());
assertThat(partition, is(document.getCode()));
}

@Test
public void getPartitionStringFromMongoDBId() {
final ObjectId document = mock(ObjectId.class);
when(document.toString()).thenReturn(UUID.randomUUID().toString());
final String partition = MongoDBHelper.getPartitionStringFromMongoDBId(document, ObjectId.class.getName());
final String partition = BsonHelper.getPartitionStringFromMongoDBId(document, ObjectId.class.getName());
assertThat(partition, is(document.toString()));
}

Expand All @@ -126,7 +75,7 @@ public void buildAndQueryForIntegerClass() {
final String lteValue = String.valueOf(getRandomInteger());
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": %s}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": %s}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Integer.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Integer.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -137,7 +86,7 @@ public void buildAndQueryForDoubleClass() {
final String lteValue = String.valueOf(Double.valueOf(getRandomInteger()));
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": %s}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": %s}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Double.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Double.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -148,7 +97,7 @@ public void buildAndQueryForStringClass() {
final String lteValue = UUID.randomUUID().toString();
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": \"%s\"}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": \"%s\"}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, String.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, String.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -159,7 +108,7 @@ public void buildAndQueryForLongClass() {
final String lteValue = String.valueOf(Long.valueOf(getRandomInteger()));
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": %s}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": %s}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Long.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Long.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -170,7 +119,7 @@ public void buildAndQueryForObjectIdClass() {
final String lteValue = getRandomHexStringLength24();
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": {\"$oid\": \"%s\"}}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": {\"$oid\": \"%s\"}}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, ObjectId.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, ObjectId.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -181,7 +130,7 @@ public void buildAndQueryForDecimal128Class() {
final String lteValue = (new Decimal128(getRandomInteger())).toString();
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": {\"$numberDecimal\": \"%s\"}}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": {\"$numberDecimal\": \"%s\"}}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Decimal128.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Decimal128.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -192,7 +141,7 @@ public void buildAndQueryForCodeClass() {
final String lteValue = (new Code(UUID.randomUUID().toString())).getCode();
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": {\"$code\": \"%s\"}}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": {\"$code\": \"%s\"}}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Code.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Code.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -203,7 +152,7 @@ public void buildAndQueryForSymbolClass() {
final String lteValue = (new Symbol(UUID.randomUUID().toString())).getSymbol();
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": {\"$symbol\": \"%s\"}}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": {\"$symbol\": \"%s\"}}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Symbol.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Symbol.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -214,7 +163,7 @@ public void buildAndQueryForDocumentClass() {
final String lteValue = Document.parse(String.format("{\"%s\":\"%s\"}", UUID.randomUUID(), UUID.randomUUID())).toJson();
final String expectedGteValueString = String.format("{\"_id\": {\"$gte\": %s}}", gteValue);
final String expectedLteValueString = String.format("{\"_id\": {\"$lte\": %s}}", lteValue);
final Bson bson = MongoDBHelper.buildAndQuery(gteValue, lteValue, Document.class.getName());
final Bson bson = BsonHelper.buildAndQuery(gteValue, lteValue, Document.class.getName());
assertThat(bson.toBsonDocument().get("$and").asArray().get(0).toString(), is(expectedGteValueString));
assertThat(bson.toBsonDocument().get("$and").asArray().get(1).toString(), is(expectedLteValueString));
}
Expand All @@ -223,12 +172,7 @@ public void buildAndQueryForDocumentClass() {
public void buildAndQueryForUnSupportedClass() {
final String gteValue = new Object().toString();
final String lteValue = new Object().toString();
Assertions.assertThrows(RuntimeException.class, () -> MongoDBHelper.buildAndQuery(gteValue, lteValue, Class.class.getName()));
}


private Boolean getRandomBoolean() {
return Math.random() < 0.5;
Assertions.assertThrows(RuntimeException.class, () -> BsonHelper.buildAndQuery(gteValue, lteValue, Class.class.getName()));
}

private byte getRandomByte() {
Expand Down

0 comments on commit f2573c9

Please sign in to comment.