Skip to content

Commit

Permalink
Update DocumentDB source pipeline config parameters (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#4451)

* Update DocumentDB source pipeline config parameters

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Apr 24, 2024
1 parent 3cc41b7 commit 2f0a564
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,6 @@ public interface EnhancedSourceCoordinator {
*/
void initialize();

String getPartitionPrefix();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class EnhancedLeaseBasedSourceCoordinator implements EnhancedSourceCoordi
private final Function<SourcePartitionStoreItem, EnhancedSourcePartition> partitionFactory;

private final PluginMetrics pluginMetrics;
private final String partitionPrefix;

/**
* Use host name of the node as the default ownerId
Expand Down Expand Up @@ -88,6 +89,7 @@ public EnhancedLeaseBasedSourceCoordinator(final SourceCoordinationStore coordin
sourceIdentifier;
this.pluginMetrics = pluginMetrics;
this.partitionFactory = partitionFactory;
this.partitionPrefix = sourceCoordinationConfig.getPartitionPrefix();
}

@Override
Expand Down Expand Up @@ -273,4 +275,8 @@ public Optional<EnhancedSourcePartition> getPartition(final String partitionKey)
return Optional.of(partitionFactory.apply(sourceItem.get()));
}

@Override
public String getPartitionPrefix() {
return partitionPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,24 @@ private static String encodeString(final String input) {
private static String getConnectionString(final MongoDBSourceConfig sourceConfig) {
final String username;
try {
username = encodeString(sourceConfig.getCredentialsConfig().getUsername());
username = encodeString(sourceConfig.getAuthenticationConfig().getUsername());
} catch (final Exception e) {
throw new RuntimeException("Unsupported characters in username.");
}

final String password;
try {
password = encodeString(sourceConfig.getCredentialsConfig().getPassword());
password = encodeString(sourceConfig.getAuthenticationConfig().getPassword());
} catch (final Exception e) {
throw new RuntimeException("Unsupported characters in password.");
}
final String hostname = sourceConfig.getHost();

if (sourceConfig.getHosts() == null || sourceConfig.getHosts().length == 0) {
throw new RuntimeException("The hosts array should at least have one host.");
}

// Support for only single host
final String hostname = sourceConfig.getHosts()[0];
final int port = sourceConfig.getPort();
final String tls = sourceConfig.getTls().toString();
final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;

import java.util.Map;
import java.util.Optional;

public class AwsConfig {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ public class CollectionConfig {
@JsonProperty("stream")
private boolean stream;

@JsonProperty("s3_bucket")
private String s3Bucket;

@JsonProperty("s3_path_prefix")
private String s3PathPrefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("partition_count")
private int partitionCount;

Expand Down Expand Up @@ -65,14 +56,6 @@ public boolean isStream() {
return this.stream;
}

public String getS3Bucket() {
return this.s3Bucket;
}

public String getS3PathPrefix() {
return this.s3PathPrefix;
}

public int getPartitionCount() {
return this.partitionCount;
}
Expand All @@ -84,7 +67,4 @@ public int getExportBatchSize() {
public int getStreamBatchSize() {
return this.streamBatchSize;
}
public String getS3Region() {
return this.s3Region;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.plugins.mongo.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import java.time.Duration;
Expand All @@ -15,8 +16,8 @@ public class MongoDBSourceConfig {
private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred";
private static final Boolean DEFAULT_DIRECT_CONNECT = true;
private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
@JsonProperty("host")
private @NotNull String host;
@JsonProperty("hosts")
private @NotNull String[] hosts;
@JsonProperty("port")
private int port = DEFAULT_PORT;
@JsonProperty("trust_store_file_path")
Expand All @@ -35,6 +36,15 @@ public class MongoDBSourceConfig {
@JsonProperty("acknowledgments")
private Boolean acknowledgments = false;

@JsonProperty("s3_bucket")
private String s3Bucket;

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty
private Duration partitionAcknowledgmentTimeout;

Expand All @@ -45,6 +55,11 @@ public class MongoDBSourceConfig {
@JsonProperty("direct_connection")
private Boolean directConnection;

@JsonProperty("aws")
@NotNull
@Valid
private AwsConfig awsConfig;

public MongoDBSourceConfig() {
this.snapshotFetchSize = DEFAULT_SNAPSHOT_FETCH_SIZE;
this.readPreference = DEFAULT_READ_PREFERENCE;
Expand All @@ -55,12 +70,12 @@ public MongoDBSourceConfig() {
this.partitionAcknowledgmentTimeout = DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT;
}

public AuthenticationConfig getCredentialsConfig() {
public AuthenticationConfig getAuthenticationConfig() {
return this.authenticationConfig;
}

public String getHost() {
return this.host;
public String[] getHosts() {
return this.hosts;
}

public int getPort() {
Expand Down Expand Up @@ -103,6 +118,22 @@ public Duration getPartitionAcknowledgmentTimeout() {
return this.partitionAcknowledgmentTimeout;
}

public String getS3Bucket() {
return this.s3Bucket;
}

public String getS3Prefix() {
return this.s3Prefix;
}

public String getS3Region() {
return this.s3Region;
}

public AwsConfig getAwsConfig() {
return this.awsConfig;
}

public static class AuthenticationConfig {
@JsonProperty("username")
private String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import java.util.List;

public class PartitionKeyRecordConverter extends RecordConverter {
public static final String S3_PATH_DELIMITER = "/";
private List<String> partitionNames = new ArrayList<>();
private int partitionSize = 0;
public PartitionKeyRecordConverter(final String collection, final String partitionType) {
final String s3PathPrefix;
public PartitionKeyRecordConverter(final String collection, final String partitionType, final String s3PathPrefix) {
super(collection, partitionType);
this.s3PathPrefix = s3PathPrefix;
}

public void initializePartitions(final List<String> partitionNames) {
Expand All @@ -28,7 +31,7 @@ public Event convert(final String record,
final Event event = super.convert(record, eventCreationTimeMillis, eventVersionNumber, eventName);
final EventMetadata eventMetadata = event.getMetadata();
final String partitionKey = String.valueOf(eventMetadata.getAttribute(MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE));
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, hashKeyToPartition(partitionKey));
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3PathPrefix + S3_PATH_DELIMITER + hashKeyToPartition(partitionKey));
return event;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
*/
public void start(Buffer<Record<Event>> buffer) {
final List<Runnable> runnableList = new ArrayList<>();
final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig);
runnableList.add(leaderScheduler);
final List<String> collections = sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList());
if (!collections.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void initialize(final MongoDBSourceConfig sourceConfig) {

@Override
public void update(MongoDBSourceConfig pluginConfig) {
final MongoDBSourceConfig.AuthenticationConfig newAuthConfig = pluginConfig.getCredentialsConfig();
final MongoDBSourceConfig.AuthenticationConfig newAuthConfig = pluginConfig.getAuthenticationConfig();
if (basicAuthChanged(newAuthConfig)) {
credentialsChangeCounter.increment();
try {
Expand Down Expand Up @@ -96,7 +96,7 @@ private void refreshJobs(MongoDBSourceConfig pluginConfig) {

private boolean basicAuthChanged(final MongoDBSourceConfig.AuthenticationConfig newAuthConfig) {
final MongoDBSourceConfig.AuthenticationConfig currentAuthConfig = currentMongoDBSourceConfig
.getCredentialsConfig();
.getAuthenticationConfig();
return !Objects.equals(currentAuthConfig.getUsername(), newAuthConfig.getUsername()) ||
!Objects.equals(currentAuthConfig.getPassword(), newAuthConfig.getPassword());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class ExportWorker implements Runnable {
*/
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000;

private static final String S3_PATH_DELIMITER = "/";

/**
* Start Line is the checkpoint
*/
Expand Down Expand Up @@ -108,8 +110,15 @@ public void run() {
if (sourcePartition.isPresent()) {
dataQueryPartition = (DataQueryPartition) sourcePartition.get();
final AcknowledgementSet acknowledgementSet = createAcknowledgementSet(dataQueryPartition).orElse(null);
final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + dataQueryPartition.getCollection();
} else {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + dataQueryPartition.getCollection();
}
final DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(sourceCoordinator, dataQueryPartition);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(), ExportPartition.PARTITION_TYPE);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(),
ExportPartition.PARTITION_TYPE, s3PathPrefix);
final ExportPartitionWorker exportPartitionWorker = new ExportPartitionWorker(recordBufferWriter, recordConverter,
dataQueryPartition, acknowledgementSet, sourceConfig, partitionCheckpoint, Instant.now().toEpochMilli(), pluginMetrics);
final CompletableFuture<Void> runLoader = CompletableFuture.runAsync(exportPartitionWorker, executor);
Expand Down
Loading

0 comments on commit 2f0a564

Please sign in to comment.