Skip to content

Commit

Permalink
Update s3 path prefix logic
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Apr 23, 2024
1 parent 8dfae0d commit 9ebaadf
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,6 @@ public interface EnhancedSourceCoordinator {
*/
void initialize();

String getPartitionPrefix();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class EnhancedLeaseBasedSourceCoordinator implements EnhancedSourceCoordi
private final Function<SourcePartitionStoreItem, EnhancedSourcePartition> partitionFactory;

private final PluginMetrics pluginMetrics;
private final String partitionPrefix;

/**
* Use host name of the node as the default ownerId
Expand Down Expand Up @@ -88,6 +89,7 @@ public EnhancedLeaseBasedSourceCoordinator(final SourceCoordinationStore coordin
sourceIdentifier;
this.pluginMetrics = pluginMetrics;
this.partitionFactory = partitionFactory;
this.partitionPrefix = sourceCoordinationConfig.getPartitionPrefix();
}

@Override
Expand Down Expand Up @@ -273,4 +275,8 @@ public Optional<EnhancedSourcePartition> getPartition(final String partitionKey)
return Optional.of(partitionFactory.apply(sourceItem.get()));
}

@Override
public String getPartitionPrefix() {
return partitionPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import jakarta.validation.constraints.NotNull;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -17,10 +16,6 @@ public class MongoDBSourceConfig {
private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred";
private static final Boolean DEFAULT_DIRECT_CONNECT = true;
private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
private static final String DATAPREPPER_SERVICE_NAME = "DATAPREPPER_SERVICE_NAME";

private static final long currentTimeInEpochMilli = Instant.now().toEpochMilli();
public static final String S3_PATH_DELIMITER = "/";
@JsonProperty("hosts")
private @NotNull String[] hosts;
@JsonProperty("port")
Expand Down Expand Up @@ -131,16 +126,6 @@ public String getS3Prefix() {
return this.s3Prefix;
}

public String getTransformedS3PathPrefix(final String collection) {
final String serviceName = System.getenv(DATAPREPPER_SERVICE_NAME);
final String suffixPath = serviceName + S3_PATH_DELIMITER + collection + S3_PATH_DELIMITER + currentTimeInEpochMilli;
if (this.getS3Prefix() == null || this.getS3Prefix().trim().isBlank()) {
return suffixPath;
} else {
return this.s3Prefix + S3_PATH_DELIMITER + suffixPath;
}
}

public String getS3Region() {
return this.s3Region;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class ExportWorker implements Runnable {
*/
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000;

private static final String S3_PATH_DELIMITER = "/";

/**
* Start Line is the checkpoint
*/
Expand Down Expand Up @@ -108,7 +110,12 @@ public void run() {
if (sourcePartition.isPresent()) {
dataQueryPartition = (DataQueryPartition) sourcePartition.get();
final AcknowledgementSet acknowledgementSet = createAcknowledgementSet(dataQueryPartition).orElse(null);
final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(dataQueryPartition.getCollection());
final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
} else {
s3PathPrefix = sourceConfig.getS3Prefix();
}
final DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(sourceCoordinator, dataQueryPartition);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(),
ExportPartition.PARTITION_TYPE, s3PathPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;

public class LeaderScheduler implements Runnable {
Expand All @@ -28,6 +27,7 @@ public class LeaderScheduler implements Runnable {
* Default duration to extend the timeout of lease
*/
static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
private static final String S3_PATH_DELIMITER = "/";

/**
* Default interval to run lease check and shard discovery
Expand Down Expand Up @@ -120,7 +120,12 @@ private void init() {
createExportPartition(collectionConfig, startTime);
}

final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(collectionConfig.getCollection() + "-" + Instant.now().toEpochMilli());
final String s3PathPrefix;
if (coordinator.getPartitionPrefix() != null ) {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + coordinator.getPartitionPrefix();
} else {
s3PathPrefix = sourceConfig.getS3Prefix();
}
createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3PathPrefix, collectionConfig);

if (collectionConfig.isStream()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class StreamScheduler implements Runnable {
static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000;
static final int DEFAULT_BUFFER_WRITE_INTERVAL_MILLS = 15_000;
private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000;
private static final String S3_PATH_DELIMITER = "/";
/**
* Number of records to accumulate before flushing to buffer
*/
Expand Down Expand Up @@ -90,7 +91,12 @@ private StreamWorker getStreamWorker (final StreamPartition streamPartition) {
final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition);
final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint,
sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS);
final String s3PathPrefix = sourceConfig.getTransformedS3PathPrefix(streamPartition.getCollection());
final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
} else {
s3PathPrefix = sourceConfig.getS3Prefix();
}
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(streamPartition.getCollection(),
StreamPartition.PARTITION_TYPE, s3PathPrefix);
final CollectionConfig partitionCollectionConfig = sourceConfig.getCollections().stream()
Expand Down

0 comments on commit 9ebaadf

Please sign in to comment.