diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java index 33faaf5654..fcf9aa83f1 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java @@ -52,7 +52,7 @@ private static String getConnectionString(final MongoDBSourceConfig sourceConfig } catch (final Exception e) { throw new RuntimeException("Unsupported characters in password."); } - final String hostname = sourceConfig.getHostname(); + final String hostname = sourceConfig.getHost(); final int port = sourceConfig.getPort(); final String tls = sourceConfig.getTls().toString(); final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString(); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java index 2a6f449165..232d7c95e1 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java @@ -15,11 +15,11 @@ public class CollectionConfig { @JsonProperty("export_config") private ExportConfig exportConfig; - @JsonProperty("export_enabled") - private boolean exportEnabled; + @JsonProperty("export") + private boolean export; - @JsonProperty("stream_enabled") - private boolean streamEnabled; + @JsonProperty("stream") + private boolean stream; @JsonProperty("s3_bucket") private String s3Bucket; @@ -34,8 +34,8 @@ public class CollectionConfig { private int streamBatchSize; public CollectionConfig() { - this.exportEnabled = true; - this.streamEnabled = true; + this.export = true; + this.stream = true; this.exportConfig = new ExportConfig(); this.streamBatchSize = DEFAULT_STREAM_BATCH_SIZE; } @@ -52,12 +52,12 @@ public String getCollectionName() { return Arrays.stream(collection.split(COLLECTION_SPLITTER)).collect(Collectors.toList()).get(1); } - public boolean isExportEnabled() { - return this.exportEnabled; + public boolean isExport() { + return this.export; } - public boolean isStreamEnabled() { - return this.streamEnabled; + public boolean isStream() { + return this.stream; } public String getS3Bucket() { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java index 653611490c..a9ed0b6981 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java @@ -15,16 +15,16 @@ public class MongoDBSourceConfig { private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred"; private static final Boolean DEFAULT_DIRECT_CONNECT = true; private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2); - @JsonProperty("hostname") - private @NotNull String hostname; + @JsonProperty("host") + private @NotNull String host; @JsonProperty("port") private int port = DEFAULT_PORT; @JsonProperty("trust_store_file_path") private String trustStoreFilePath; @JsonProperty("trust_store_password") private String trustStorePassword; - @JsonProperty("credentials") - private CredentialsConfig credentialsConfig; + @JsonProperty("authentication") + private AuthenticationConfig authenticationConfig; @JsonProperty("snapshot_fetch_size") private String snapshotFetchSize; @@ -55,12 +55,12 @@ public MongoDBSourceConfig() { this.partitionAcknowledgmentTimeout = DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT; } - public CredentialsConfig getCredentialsConfig() { - return this.credentialsConfig; + public AuthenticationConfig getCredentialsConfig() { + return this.authenticationConfig; } - public String getHostname() { - return this.hostname; + public String getHost() { + return this.host; } public int getPort() { @@ -103,7 +103,7 @@ public Duration getPartitionAcknowledgmentTimeout() { return this.partitionAcknowledgmentTimeout; } - public static class CredentialsConfig { + public static class AuthenticationConfig { @JsonProperty("username") private String username; diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java index c606b7345a..38f5c4cf93 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java @@ -55,14 +55,14 @@ public void start(Buffer> buffer) { final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections()); runnableList.add(leaderScheduler); - if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExportEnabled)) { + if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) { final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics); final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig); runnableList.add(exportScheduler); runnableList.add(exportWorker); } - if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStreamEnabled)) { + if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) { final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator); runnableList.add(s3PartitionCreatorScheduler); final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java index 178fdf677d..c4ab7c8c85 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java @@ -113,15 +113,15 @@ private void init() { coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null)); final Instant startTime = Instant.now(); - final boolean exportRequired = collectionConfig.isExportEnabled(); - LOG.info("Ingestion mode export {} and stream {} for Collection {}", collectionConfig.isExportEnabled(), collectionConfig.isStreamEnabled(), collectionConfig.getCollection()); + final boolean exportRequired = collectionConfig.isExport(); + LOG.info("Ingestion mode export {} and stream {} for Collection {}", collectionConfig.isExport(), collectionConfig.isStream(), collectionConfig.getCollection()); if (exportRequired) { createExportPartition(collectionConfig, startTime); } createS3Partition(collectionConfig); - if (collectionConfig.isStreamEnabled()) { + if (collectionConfig.isStream()) { createStreamPartition(collectionConfig, startTime, exportRequired); } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java index e66eaf56b9..23ee2bc37d 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java @@ -28,16 +28,16 @@ public class MongoDBConnectionTest { private MongoDBSourceConfig mongoDBSourceConfig; @Mock - private MongoDBSourceConfig.CredentialsConfig credentialsConfig; + private MongoDBSourceConfig.AuthenticationConfig authenticationConfig; private final Random random = new Random(); @BeforeEach void setUp() { - when(mongoDBSourceConfig.getCredentialsConfig()).thenReturn(credentialsConfig); - when(credentialsConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); - when(credentialsConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); - when(mongoDBSourceConfig.getHostname()).thenReturn(UUID.randomUUID().toString()); + when(mongoDBSourceConfig.getCredentialsConfig()).thenReturn(authenticationConfig); + when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); + when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); + when(mongoDBSourceConfig.getHost()).thenReturn(UUID.randomUUID().toString()); when(mongoDBSourceConfig.getPort()).thenReturn(getRandomInteger()); when(mongoDBSourceConfig.getTls()).thenReturn(getRandomBoolean()); when(mongoDBSourceConfig.getSslInsecureDisableVerification()).thenReturn(getRandomBoolean()); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java index b882b77a2f..66e1df3779 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java @@ -63,8 +63,8 @@ void test_should_init() { leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - given(collectionConfig.isExportEnabled()).willReturn(true); - given(collectionConfig.isStreamEnabled()).willReturn(true); + given(collectionConfig.isExport()).willReturn(true); + given(collectionConfig.isStream()).willReturn(true); given(collectionConfig.getExportConfig()).willReturn(exportConfig); given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt()); given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); @@ -97,7 +97,7 @@ void test_should_init_export() { leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - given(collectionConfig.isExportEnabled()).willReturn(true); + given(collectionConfig.isExport()).willReturn(true); given(collectionConfig.getExportConfig()).willReturn(exportConfig); given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt()); given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); @@ -130,7 +130,7 @@ void test_should_init_stream() { leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - given(collectionConfig.isStreamEnabled()).willReturn(true); + given(collectionConfig.isStream()).willReturn(true); given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); final ExecutorService executorService = Executors.newSingleThreadExecutor();