Skip to content

Commit

Permalink
Update DocumentDB source pipeline config parameters
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Apr 22, 2024
1 parent 8255920 commit 5145cdb
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,24 @@ private static String encodeString(final String input) {
private static String getConnectionString(final MongoDBSourceConfig sourceConfig) {
final String username;
try {
username = encodeString(sourceConfig.getCredentialsConfig().getUsername());
username = encodeString(sourceConfig.getAuthenticationConfig().getUsername());
} catch (final Exception e) {
throw new RuntimeException("Unsupported characters in username.");
}

final String password;
try {
password = encodeString(sourceConfig.getCredentialsConfig().getPassword());
password = encodeString(sourceConfig.getAuthenticationConfig().getPassword());
} catch (final Exception e) {
throw new RuntimeException("Unsupported characters in password.");
}
final String hostname = sourceConfig.getHost();

if (sourceConfig.getHosts() == null || sourceConfig.getHosts().length == 0) {
throw new RuntimeException("The hosts array should at least have one host.");
}

// Support for only single host
final String hostname = sourceConfig.getHosts()[0];
final int port = sourceConfig.getPort();
final String tls = sourceConfig.getTls().toString();
final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;

import java.util.Map;
import java.util.Optional;

public class AwsConfig {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ public class CollectionConfig {
@JsonProperty("stream")
private boolean stream;

@JsonProperty("s3_bucket")
private String s3Bucket;

@JsonProperty("s3_path_prefix")
private String s3PathPrefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("partition_count")
private int partitionCount;

Expand Down Expand Up @@ -65,14 +56,6 @@ public boolean isStream() {
return this.stream;
}

public String getS3Bucket() {
return this.s3Bucket;
}

public String getS3PathPrefix() {
return this.s3PathPrefix;
}

public int getPartitionCount() {
return this.partitionCount;
}
Expand All @@ -84,7 +67,4 @@ public int getExportBatchSize() {
public int getStreamBatchSize() {
return this.streamBatchSize;
}
public String getS3Region() {
return this.s3Region;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.opensearch.dataprepper.plugins.mongo.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -15,8 +17,11 @@ public class MongoDBSourceConfig {
private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred";
private static final Boolean DEFAULT_DIRECT_CONNECT = true;
private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
@JsonProperty("host")
private @NotNull String host;
private static final String DATAPREPPER_SERVICE_NAME = "DATAPREPPER_SERVICE_NAME";

private static final long currentTimeInEpochMilli = Instant.now().toEpochMilli();
@JsonProperty("hosts")
private @NotNull String[] hosts;
@JsonProperty("port")
private int port = DEFAULT_PORT;
@JsonProperty("trust_store_file_path")
Expand All @@ -35,6 +40,15 @@ public class MongoDBSourceConfig {
@JsonProperty("acknowledgments")
private Boolean acknowledgments = false;

@JsonProperty("s3_bucket")
private String s3Bucket;

@JsonProperty("s3_path_prefix")
private String s3PathPrefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty
private Duration partitionAcknowledgmentTimeout;

Expand All @@ -45,6 +59,11 @@ public class MongoDBSourceConfig {
@JsonProperty("direct_connection")
private Boolean directConnection;

@JsonProperty("aws")
@NotNull
@Valid
private AwsConfig awsConfig;

public MongoDBSourceConfig() {
this.snapshotFetchSize = DEFAULT_SNAPSHOT_FETCH_SIZE;
this.readPreference = DEFAULT_READ_PREFERENCE;
Expand All @@ -55,12 +74,12 @@ public MongoDBSourceConfig() {
this.partitionAcknowledgmentTimeout = DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT;
}

public AuthenticationConfig getCredentialsConfig() {
public AuthenticationConfig getAuthenticationConfig() {
return this.authenticationConfig;
}

public String getHost() {
return this.host;
public String[] getHosts() {
return this.hosts;
}

public int getPort() {
Expand Down Expand Up @@ -103,6 +122,32 @@ public Duration getPartitionAcknowledgmentTimeout() {
return this.partitionAcknowledgmentTimeout;
}

public String getS3Bucket() {
return this.s3Bucket;
}

public String getS3PathPrefix() {
return this.s3PathPrefix;
}

public String getTransformedS3PathPrefix(final String collection) {
final String serviceName = System.getenv(DATAPREPPER_SERVICE_NAME);
final String suffixPath = serviceName + "/" + collection + "/" + currentTimeInEpochMilli;
if (this.getS3PathPrefix() == null || this.getS3PathPrefix().trim().isBlank()) {
return this.s3PathPrefix + "/" + suffixPath;
} else {
return suffixPath;
}
}

public String getS3Region() {
return this.s3Region;
}

public AwsConfig getAwsConfig() {
return this.awsConfig;
}

public static class AuthenticationConfig {
@JsonProperty("username")
private String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import java.util.List;

public class PartitionKeyRecordConverter extends RecordConverter {
public static final String S3_PATH_DELIMITER = "/";
private List<String> partitionNames = new ArrayList<>();
private int partitionSize = 0;
public PartitionKeyRecordConverter(final String collection, final String partitionType) {
final String s3PathPrefix;
public PartitionKeyRecordConverter(final String collection, final String partitionType, final String s3PathPrefix) {
super(collection, partitionType);
this.s3PathPrefix = s3PathPrefix;
}

public void initializePartitions(final List<String> partitionNames) {
Expand All @@ -28,7 +31,7 @@ public Event convert(final String record,
final Event event = super.convert(record, eventCreationTimeMillis, eventVersionNumber, eventName);
final EventMetadata eventMetadata = event.getMetadata();
final String partitionKey = String.valueOf(eventMetadata.getAttribute(MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE));
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, hashKeyToPartition(partitionKey));
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3PathPrefix + S3_PATH_DELIMITER + hashKeyToPartition(partitionKey));
return event;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
public void start(Buffer<Record<Event>> buffer) {
final List<Runnable> runnableList = new ArrayList<>();

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig);
runnableList.add(leaderScheduler);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ public void run() {
if (sourcePartition.isPresent()) {
dataQueryPartition = (DataQueryPartition) sourcePartition.get();
final AcknowledgementSet acknowledgementSet = createAcknowledgementSet(dataQueryPartition).orElse(null);
final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(dataQueryPartition.getCollection());
final DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(sourceCoordinator, dataQueryPartition);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(), ExportPartition.PARTITION_TYPE);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(),
ExportPartition.PARTITION_TYPE, s3PathPrefix);
final ExportPartitionWorker exportPartitionWorker = new ExportPartitionWorker(recordBufferWriter, recordConverter,
dataQueryPartition, acknowledgementSet, sourceConfig, partitionCheckpoint, Instant.now().toEpochMilli(), pluginMetrics);
final CompletableFuture<Void> runLoader = CompletableFuture.runAsync(exportPartitionWorker, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.LeaderPartition;
Expand Down Expand Up @@ -33,22 +34,22 @@ public class LeaderScheduler implements Runnable {
*/
private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1);

private final List<CollectionConfig> collectionConfigs;
private final MongoDBSourceConfig sourceConfig;

private final EnhancedSourceCoordinator coordinator;

private final Duration leaseInterval;

private LeaderPartition leaderPartition;

public LeaderScheduler(EnhancedSourceCoordinator coordinator, List<CollectionConfig> collectionConfigs) {
this(coordinator, collectionConfigs, DEFAULT_LEASE_INTERVAL);
public LeaderScheduler(final EnhancedSourceCoordinator coordinator, final MongoDBSourceConfig sourceConfig) {
this(coordinator, sourceConfig, DEFAULT_LEASE_INTERVAL);
}

LeaderScheduler(EnhancedSourceCoordinator coordinator,
List<CollectionConfig> collectionConfigs,
MongoDBSourceConfig sourceConfig,
Duration leaseInterval) {
this.collectionConfigs = collectionConfigs;
this.sourceConfig = sourceConfig;
this.coordinator = coordinator;
this.leaseInterval = leaseInterval;
}
Expand Down Expand Up @@ -106,7 +107,7 @@ public void run() {
private void init() {
LOG.info("Try to initialize DocumentDB Leader Partition");

collectionConfigs.forEach(collectionConfig -> {
sourceConfig.getCollections().forEach(collectionConfig -> {
// Create a Global state in the coordination table for the configuration.
// Global State here is designed to be able to read whenever needed
// So that the jobs can refer to the configuration.
Expand All @@ -119,7 +120,8 @@ private void init() {
createExportPartition(collectionConfig, startTime);
}

createS3Partition(collectionConfig);
final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(collectionConfig.getCollection() + "-" + Instant.now().toEpochMilli());
createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3PathPrefix, collectionConfig);

if (collectionConfig.isStream()) {
createStreamPartition(collectionConfig, startTime, exportRequired);
Expand All @@ -137,10 +139,10 @@ private void init() {
*
* @param collectionConfig collection configuration object containing collection details
*/
private void createS3Partition(final CollectionConfig collectionConfig) {
private void createS3Partition(final String s3Bucket, final String s3Region, final String s3PathPrefix, final CollectionConfig collectionConfig) {
LOG.info("Creating s3 folder global partition: {}", collectionConfig.getCollection());
coordinator.createPartition(new S3FolderPartition(collectionConfig.getS3Bucket(), collectionConfig.getS3PathPrefix(),
collectionConfig.getS3Region(), collectionConfig.getCollection(), collectionConfig.getPartitionCount()));
coordinator.createPartition(new S3FolderPartition(s3Bucket, s3PathPrefix,
s3Region, collectionConfig.getCollection(), collectionConfig.getPartitionCount()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ private StreamWorker getStreamWorker (final StreamPartition streamPartition) {
final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition);
final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint,
sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(streamPartition.getCollection(),StreamPartition.PARTITION_TYPE);
final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(streamPartition.getCollection());
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(streamPartition.getCollection(),
StreamPartition.PARTITION_TYPE, s3PathPrefix);
final CollectionConfig partitionCollectionConfig = sourceConfig.getCollections().stream()
.filter(collectionConfig -> collectionConfig.getCollection().equals(streamPartition.getCollection()))
.findFirst()
Expand Down
Loading

0 comments on commit 5145cdb

Please sign in to comment.