diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java index 9b2258b549..232ba58aa1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java @@ -121,4 +121,6 @@ public interface EnhancedSourceCoordinator { */ void initialize(); + String getPartitionPrefix(); + } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java index d088e098db..a07706152a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java @@ -61,6 +61,7 @@ public class EnhancedLeaseBasedSourceCoordinator implements EnhancedSourceCoordi private final Function partitionFactory; private final PluginMetrics pluginMetrics; + private final String partitionPrefix; /** * Use host name of the node as the default ownerId @@ -88,6 +89,7 @@ public EnhancedLeaseBasedSourceCoordinator(final SourceCoordinationStore coordin sourceIdentifier; this.pluginMetrics = pluginMetrics; this.partitionFactory = partitionFactory; + this.partitionPrefix = sourceCoordinationConfig.getPartitionPrefix(); } @Override @@ -273,4 +275,8 @@ public Optional getPartition(final String partitionKey) return Optional.of(partitionFactory.apply(sourceItem.get())); } + @Override + public String getPartitionPrefix() { + return partitionPrefix; + } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java index a756e0bc97..d250c4b0c5 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java @@ -5,7 +5,6 @@ import jakarta.validation.constraints.NotNull; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -17,10 +16,6 @@ public class MongoDBSourceConfig { private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred"; private static final Boolean DEFAULT_DIRECT_CONNECT = true; private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2); - private static final String DATAPREPPER_SERVICE_NAME = "DATAPREPPER_SERVICE_NAME"; - - private static final long currentTimeInEpochMilli = Instant.now().toEpochMilli(); - public static final String S3_PATH_DELIMITER = "/"; @JsonProperty("hosts") private @NotNull String[] hosts; @JsonProperty("port") @@ -131,16 +126,6 @@ public String getS3Prefix() { return this.s3Prefix; } - public String getTransformedS3PathPrefix(final String collection) { - final String serviceName = System.getenv(DATAPREPPER_SERVICE_NAME); - final String suffixPath = serviceName + S3_PATH_DELIMITER + collection + S3_PATH_DELIMITER + currentTimeInEpochMilli; - if (this.getS3Prefix() == null || this.getS3Prefix().trim().isBlank()) { - return suffixPath; - } else { - return this.s3Prefix + S3_PATH_DELIMITER + suffixPath; - } - } - public String getS3Region() { return this.s3Region; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java index 69d50a699b..f91d43c9eb 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java @@ -57,6 +57,8 @@ public class ExportWorker implements Runnable { */ private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000; + private static final String S3_PATH_DELIMITER = "/"; + /** * Start Line is the checkpoint */ @@ -108,7 +110,12 @@ public void run() { if (sourcePartition.isPresent()) { dataQueryPartition = (DataQueryPartition) sourcePartition.get(); final AcknowledgementSet acknowledgementSet = createAcknowledgementSet(dataQueryPartition).orElse(null); - final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(dataQueryPartition.getCollection()); + final String s3PathPrefix; + if (sourceCoordinator.getPartitionPrefix() != null ) { + s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix(); + } else { + s3PathPrefix = sourceConfig.getS3Prefix(); + } final DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(sourceCoordinator, dataQueryPartition); final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(), ExportPartition.PARTITION_TYPE, s3PathPrefix); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java index 6e33910f51..fd3a54c7ce 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java @@ -17,7 +17,6 @@ import java.time.Duration; import java.time.Instant; -import java.util.List; import java.util.Optional; public class LeaderScheduler implements Runnable { @@ -28,6 +27,7 @@ public class LeaderScheduler implements Runnable { * Default duration to extend the timeout of lease */ static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; + private static final String S3_PATH_DELIMITER = "/"; /** * Default interval to run lease check and shard discovery @@ -120,7 +120,12 @@ private void init() { createExportPartition(collectionConfig, startTime); } - final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(collectionConfig.getCollection() + "-" + Instant.now().toEpochMilli()); + final String s3PathPrefix; + if (coordinator.getPartitionPrefix() != null ) { + s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + coordinator.getPartitionPrefix(); + } else { + s3PathPrefix = sourceConfig.getS3Prefix(); + } createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3PathPrefix, collectionConfig); if (collectionConfig.isStream()) { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java index 25ddb84063..3781f65803 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java @@ -26,6 +26,7 @@ public class StreamScheduler implements Runnable { static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000; static final int DEFAULT_BUFFER_WRITE_INTERVAL_MILLS = 15_000; private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; + private static final String S3_PATH_DELIMITER = "/"; /** * Number of records to accumulate before flushing to buffer */ @@ -90,7 +91,12 @@ private StreamWorker getStreamWorker (final StreamPartition streamPartition) { final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition); final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS); - final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(streamPartition.getCollection()); + final String s3PathPrefix; + if (sourceCoordinator.getPartitionPrefix() != null ) { + s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix(); + } else { + s3PathPrefix = sourceConfig.getS3Prefix(); + } final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(streamPartition.getCollection(), StreamPartition.PARTITION_TYPE, s3PathPrefix); final CollectionConfig partitionCollectionConfig = sourceConfig.getCollections().stream()