Skip to content

Commit

Permalink
Remove creating S3 prefix path partition upfront
Browse files Browse the repository at this point in the history
This will be done when S3 sink writes data with path prefix.

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Apr 17, 2024
1 parent 42596a9 commit b5dedda
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class DocumentDBService {
private static final Logger LOG = LoggerFactory.getLogger(DocumentDBService.class);
Expand Down Expand Up @@ -62,9 +63,13 @@ public void start(Buffer<Record<Event>> buffer) {
runnableList.add(exportWorker);
}

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator);
final List<String> collections = sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList());
if (!collections.isEmpty()) {
final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator, collections);
runnableList.add(s3PartitionCreatorScheduler);
}

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
runnableList.add(streamScheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DataQueryPartitionCheckpoint extends S3FolderPartitionCoordinator {


public DataQueryPartitionCheckpoint(EnhancedSourceCoordinator enhancedSourceCoordinator, DataQueryPartition dataQueryPartition) {
super(enhancedSourceCoordinator, dataQueryPartition.getCollection());
super(enhancedSourceCoordinator);
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.dataQueryPartition = dataQueryPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public ExportPartitionWorker(final RecordBufferWriter recordBufferWriter,
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
}

private boolean shouldWaitForS3Partition() {
s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus();
private boolean shouldWaitForS3Partition(final String collection) {
s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus(collection);
return s3PartitionStatus.isEmpty();
}

Expand All @@ -120,7 +120,7 @@ public void run() {
throw new RuntimeException("Invalid Collection Name. Must as db.collection format");
}
long lastCheckpointTime = System.currentTimeMillis();
while (shouldWaitForS3Partition() && !Thread.currentThread().isInterrupted()) {
while (shouldWaitForS3Partition(dataQueryPartition.getCollection()) && !Thread.currentThread().isInterrupted()) {
LOG.info("S3 partition was not complete for collection {}, waiting for partitions to be created before resuming export.", dataQueryPartition.getCollection());
try {
Thread.sleep(DEFAULT_PARTITION_CREATE_WAIT_INTERVAL_MILLIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.model.S3PartitionStatus;

import java.util.Optional;

Expand All @@ -16,19 +17,16 @@
*/
public class S3FolderPartitionCoordinator {
private final EnhancedSourceCoordinator enhancedSourceCoordinator;
private final String collection;


public S3FolderPartitionCoordinator(final EnhancedSourceCoordinator enhancedSourceCoordinator, final String collection) {
public S3FolderPartitionCoordinator(final EnhancedSourceCoordinator enhancedSourceCoordinator) {
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.collection = collection;
}

public Optional<org.opensearch.dataprepper.plugins.mongo.model.S3PartitionStatus> getGlobalS3FolderCreationStatus() {
public Optional<S3PartitionStatus> getGlobalS3FolderCreationStatus(final String collection) {
final Optional<EnhancedSourcePartition> partition = enhancedSourceCoordinator.getPartition(S3PartitionCreatorScheduler.S3_FOLDER_PREFIX + collection);
if(partition.isPresent()) {
final GlobalState globalState = (GlobalState)partition.get();
return Optional.of(org.opensearch.dataprepper.plugins.mongo.model.S3PartitionStatus.fromMap(globalState.getProgressState().get()));
return Optional.of(S3PartitionStatus.fromMap(globalState.getProgressState().get()));
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,24 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.util.ArrayList;
import java.util.List;

public class S3PartitionCreator {
private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreator.class);
final String bucketName;
final String subFolder;
final String region;
final S3Client s3Client;
private final int partitionSize;

S3PartitionCreator(final String bucketName, final String subFolder, final String region) {
this.bucketName = bucketName;
this.subFolder = subFolder;
this.region = region;
this.s3Client = S3Client.builder().region(Region.of(region)).build();
S3PartitionCreator(final int partitionSize) {
this.partitionSize = partitionSize;
}

List<String> createPartition() {
final List<String> partitions = new ArrayList<>();
for (int i = 0; i < 256; i++) {
String folderName = String.format("%02x", i) + "/";
String key = subFolder + "/" + folderName;
createPartition(key);
partitions.add(folderName);
for (int i = 0; i < partitionSize; i++) {
String partitionName = String.format("%02x", i) + "/";
partitions.add(partitionName);
}
LOG.info("S3 partition created successfully.");
return partitions;
}

private void createPartition(final String key) {
try {
s3Client.putObject(PutObjectRequest.builder()
.bucket(bucketName)
.key(key)
.build(), RequestBody.empty());
} catch (final Exception e) {
LOG.error("Error creating partition {}", key, e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class S3PartitionCreatorScheduler implements Runnable {
public class S3PartitionCreatorScheduler extends S3FolderPartitionCoordinator implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreatorScheduler.class);
public static final String S3_FOLDER_PREFIX = "S3-FOLDER-";
private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
private static final int DEFAULT_S3_PARTITION_SIZE = 50;
private final EnhancedSourceCoordinator sourceCoordinator;
public S3PartitionCreatorScheduler(final EnhancedSourceCoordinator sourceCoordinator) {
private final List<String> collections;
public S3PartitionCreatorScheduler(final EnhancedSourceCoordinator sourceCoordinator,
final List<String> collections) {
super(sourceCoordinator);
this.sourceCoordinator = sourceCoordinator;
this.collections = new ArrayList<>(collections);
}

@Override
Expand All @@ -27,10 +33,11 @@ public void run() {
final Optional<EnhancedSourcePartition> sourcePartition = sourceCoordinator.acquireAvailablePartition(S3FolderPartition.PARTITION_TYPE);
if (sourcePartition.isPresent()) {
final S3FolderPartition s3FolderPartition = (S3FolderPartition) sourcePartition.get();
final List<String> s3Folders = createS3BucketPartitions(s3FolderPartition);
final List<String> s3Folders = createS3BucketPartitions();
sourceCoordinator.completePartition(s3FolderPartition);
final S3PartitionStatus s3PartitionStatus = new S3PartitionStatus(s3Folders);
sourceCoordinator.createPartition(new GlobalState(S3_FOLDER_PREFIX + s3FolderPartition.getCollection(), s3PartitionStatus.toMap()));
break;
}

try {
Expand All @@ -39,6 +46,19 @@ public void run() {
LOG.info("The S3 partition creator scheduler was interrupted while waiting to retry, stopping processing");
break;
}

collections.forEach(collection -> {
final Optional<S3PartitionStatus> s3PartitionStatus = getGlobalS3FolderCreationStatus(collection);
if (s3PartitionStatus.isPresent()) {
collections.remove(collection);
}
});

if (collections.isEmpty()) {
LOG.info("The S3 folder partition global state created for all collections.");
break;
}

} catch (final Exception e) {
LOG.error("Received an exception during creation of S3 partition folder, backing off and retrying", e);
try {
Expand All @@ -52,9 +72,8 @@ public void run() {
LOG.warn("S3 partition creator scheduler interrupted, looks like shutdown has triggered");
}

private List<String> createS3BucketPartitions(final S3FolderPartition s3FolderPartition) {
final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(s3FolderPartition.getBucketName(), s3FolderPartition.getSubFolder(),
s3FolderPartition.getRegion());
private List<String> createS3BucketPartitions() {
final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(DEFAULT_S3_PARTITION_SIZE);
return s3PartitionCreator.createPartition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class DataStreamPartitionCheckpoint extends S3FolderPartitionCoordinator

public DataStreamPartitionCheckpoint(final EnhancedSourceCoordinator enhancedSourceCoordinator,
final StreamPartition streamPartition) {
super(enhancedSourceCoordinator, streamPartition.getCollection());
super(enhancedSourceCoordinator);
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.streamPartition = streamPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private boolean shouldWaitForExport(final StreamPartition streamPartition) {
return progressState.shouldWaitForExport() && loadStatus.isEmpty();
}

private boolean shouldWaitForS3Partition() {
s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus();
private boolean shouldWaitForS3Partition(final String collection) {
s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus(collection);
return s3PartitionStatus.isEmpty();
}

Expand All @@ -148,7 +148,7 @@ public void processStream(final StreamPartition streamPartition) {
MongoCollection<Document> collection = database.getCollection(collectionDBNameList.get(1));

try (MongoCursor<ChangeStreamDocument<Document>> cursor = getChangeStreamCursor(collection, resumeToken.orElse(null))) {
while ((shouldWaitForExport(streamPartition) || shouldWaitForS3Partition()) && !Thread.currentThread().isInterrupted()) {
while ((shouldWaitForExport(streamPartition) || shouldWaitForS3Partition(streamPartition.getCollection())) && !Thread.currentThread().isInterrupted()) {
LOG.info("Initial load not complete for collection {}, waiting for initial lo be complete before resuming streams.", collectionDbName);
try {
Thread.sleep(DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ public void testProcessPartitionSuccess(final String partitionKey) {
lenient().when(dataQueryPartition.getPartitionKey()).thenReturn(partitionKey);
lenient().when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE))
.thenReturn(Optional.of(dataQueryPartition));
final String collection = partitionKey.split("\\|")[0];
when(dataQueryPartition.getCollection()).thenReturn(collection);

S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class);
final List<String> partitions = List.of("first", "second");
when(s3PartitionStatus.getPartitions()).thenReturn(partitions);
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus));
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus));

final Future<?> future = executorService.submit(() -> {
try (MockedStatic<MongoDBConnection> mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void test_processStream_success() {
S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class);
final List<String> partitions = List.of("first", "second");
when(s3PartitionStatus.getPartitions()).thenReturn(partitions);
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus));
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus));

try (MockedStatic<MongoDBConnection> mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) {
mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class)))
Expand All @@ -160,7 +160,7 @@ void test_processStream_success() {
}
verify(mongoClient).close();
verify(mongoDatabase).getCollection(eq("collection"));
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus();
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection);
verify(mockRecordConverter).initializePartitions(partitions);
verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1));
verify(mockRecordConverter).convert(eq(doc1Json2), eq(timeSecond2 * 1000L), eq(timeSecond2 * 1000L), eq(operationType2));
Expand Down Expand Up @@ -249,7 +249,7 @@ void test_processStream_checkPointIntervalSuccess() {
S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class);
final List<String> partitions = List.of("first", "second");
when(s3PartitionStatus.getPartitions()).thenReturn(partitions);
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus));
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus));
try (MockedStatic<MongoDBConnection> mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) {

mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class)))
Expand All @@ -261,7 +261,7 @@ void test_processStream_checkPointIntervalSuccess() {
verify(mongoDatabase).getCollection(eq("collection"));
verify(cursor).close();
verify(cursor, times(4)).hasNext();
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus();
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection);
verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3);
verify(successItemsCounter).increment(1);
verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2);
Expand All @@ -288,7 +288,7 @@ void test_processStream_stopWorker() {
when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable);
when(changeStreamIterable.iterator()).thenReturn(cursor);
S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class);
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus));
when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus));
final List<String> partitions = List.of("first", "second");
when(s3PartitionStatus.getPartitions()).thenReturn(partitions);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down

0 comments on commit b5dedda

Please sign in to comment.