Skip to content

Commit

Permalink
Rename DocDB/MongoDB config parameters (opensearch-project#4426)
Browse files Browse the repository at this point in the history
* Rename DocDB/MongoDB config parameters

Signed-off-by: Dinu John <[email protected]>

* Update hostname to host in DocDB/MongDb config parameter

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Apr 16, 2024
1 parent 11b18cd commit 42596a9
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private static String getConnectionString(final MongoDBSourceConfig sourceConfig
} catch (final Exception e) {
throw new RuntimeException("Unsupported characters in password.");
}
final String hostname = sourceConfig.getHostname();
final String hostname = sourceConfig.getHost();
final int port = sourceConfig.getPort();
final String tls = sourceConfig.getTls().toString();
final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public class CollectionConfig {
@JsonProperty("export_config")
private ExportConfig exportConfig;

@JsonProperty("export_enabled")
private boolean exportEnabled;
@JsonProperty("export")
private boolean export;

@JsonProperty("stream_enabled")
private boolean streamEnabled;
@JsonProperty("stream")
private boolean stream;

@JsonProperty("s3_bucket")
private String s3Bucket;
Expand All @@ -34,8 +34,8 @@ public class CollectionConfig {
private int streamBatchSize;

public CollectionConfig() {
this.exportEnabled = true;
this.streamEnabled = true;
this.export = true;
this.stream = true;
this.exportConfig = new ExportConfig();
this.streamBatchSize = DEFAULT_STREAM_BATCH_SIZE;
}
Expand All @@ -52,12 +52,12 @@ public String getCollectionName() {
return Arrays.stream(collection.split(COLLECTION_SPLITTER)).collect(Collectors.toList()).get(1);
}

public boolean isExportEnabled() {
return this.exportEnabled;
public boolean isExport() {
return this.export;
}

public boolean isStreamEnabled() {
return this.streamEnabled;
public boolean isStream() {
return this.stream;
}

public String getS3Bucket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ public class MongoDBSourceConfig {
private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred";
private static final Boolean DEFAULT_DIRECT_CONNECT = true;
private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
@JsonProperty("hostname")
private @NotNull String hostname;
@JsonProperty("host")
private @NotNull String host;
@JsonProperty("port")
private int port = DEFAULT_PORT;
@JsonProperty("trust_store_file_path")
private String trustStoreFilePath;
@JsonProperty("trust_store_password")
private String trustStorePassword;
@JsonProperty("credentials")
private CredentialsConfig credentialsConfig;
@JsonProperty("authentication")
private AuthenticationConfig authenticationConfig;

@JsonProperty("snapshot_fetch_size")
private String snapshotFetchSize;
Expand Down Expand Up @@ -55,12 +55,12 @@ public MongoDBSourceConfig() {
this.partitionAcknowledgmentTimeout = DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT;
}

public CredentialsConfig getCredentialsConfig() {
return this.credentialsConfig;
public AuthenticationConfig getCredentialsConfig() {
return this.authenticationConfig;
}

public String getHostname() {
return this.hostname;
public String getHost() {
return this.host;
}

public int getPort() {
Expand Down Expand Up @@ -103,7 +103,7 @@ public Duration getPartitionAcknowledgmentTimeout() {
return this.partitionAcknowledgmentTimeout;
}

public static class CredentialsConfig {
public static class AuthenticationConfig {
@JsonProperty("username")
private String username;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public void start(Buffer<Record<Event>> buffer) {
final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
runnableList.add(leaderScheduler);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExportEnabled)) {
if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) {
final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
runnableList.add(exportScheduler);
runnableList.add(exportWorker);
}

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStreamEnabled)) {
if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator);
runnableList.add(s3PartitionCreatorScheduler);
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ private void init() {
coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null));

final Instant startTime = Instant.now();
final boolean exportRequired = collectionConfig.isExportEnabled();
LOG.info("Ingestion mode export {} and stream {} for Collection {}", collectionConfig.isExportEnabled(), collectionConfig.isStreamEnabled(), collectionConfig.getCollection());
final boolean exportRequired = collectionConfig.isExport();
LOG.info("Ingestion mode export {} and stream {} for Collection {}", collectionConfig.isExport(), collectionConfig.isStream(), collectionConfig.getCollection());
if (exportRequired) {
createExportPartition(collectionConfig, startTime);
}

createS3Partition(collectionConfig);

if (collectionConfig.isStreamEnabled()) {
if (collectionConfig.isStream()) {
createStreamPartition(collectionConfig, startTime, exportRequired);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ public class MongoDBConnectionTest {
private MongoDBSourceConfig mongoDBSourceConfig;

@Mock
private MongoDBSourceConfig.CredentialsConfig credentialsConfig;
private MongoDBSourceConfig.AuthenticationConfig authenticationConfig;

private final Random random = new Random();

@BeforeEach
void setUp() {
when(mongoDBSourceConfig.getCredentialsConfig()).thenReturn(credentialsConfig);
when(credentialsConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID());
when(credentialsConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID());
when(mongoDBSourceConfig.getHostname()).thenReturn(UUID.randomUUID().toString());
when(mongoDBSourceConfig.getCredentialsConfig()).thenReturn(authenticationConfig);
when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID());
when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID());
when(mongoDBSourceConfig.getHost()).thenReturn(UUID.randomUUID().toString());
when(mongoDBSourceConfig.getPort()).thenReturn(getRandomInteger());
when(mongoDBSourceConfig.getTls()).thenReturn(getRandomBoolean());
when(mongoDBSourceConfig.getSslInsecureDisableVerification()).thenReturn(getRandomBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ void test_should_init() {
leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100));
leaderPartition = new LeaderPartition();
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
given(collectionConfig.isExportEnabled()).willReturn(true);
given(collectionConfig.isStreamEnabled()).willReturn(true);
given(collectionConfig.isExport()).willReturn(true);
given(collectionConfig.isStream()).willReturn(true);
given(collectionConfig.getExportConfig()).willReturn(exportConfig);
given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt());
given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString());
Expand Down Expand Up @@ -97,7 +97,7 @@ void test_should_init_export() {
leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100));
leaderPartition = new LeaderPartition();
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
given(collectionConfig.isExportEnabled()).willReturn(true);
given(collectionConfig.isExport()).willReturn(true);
given(collectionConfig.getExportConfig()).willReturn(exportConfig);
given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt());
given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString());
Expand Down Expand Up @@ -130,7 +130,7 @@ void test_should_init_stream() {
leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100));
leaderPartition = new LeaderPartition();
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
given(collectionConfig.isStreamEnabled()).willReturn(true);
given(collectionConfig.isStream()).willReturn(true);
given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString());

final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down

0 comments on commit 42596a9

Please sign in to comment.