Skip to content

Commit

Permalink
Fix DocDB export and stream processing self recovery with invalid dat…
Browse files Browse the repository at this point in the history
…absae or collection name

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed May 20, 2024
1 parent 9d6510d commit 6cf4386
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.plugins.mongo.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus;
import org.opensearch.dataprepper.plugins.mongo.model.PartitionIdentifierBatch;
import org.opensearch.dataprepper.plugins.mongo.model.StreamLoadStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,6 +27,8 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.STREAM_PREFIX;

public class ExportScheduler implements Runnable {
public static final String EXPORT_PREFIX = "EXPORT-";
private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class);
Expand Down Expand Up @@ -83,12 +86,11 @@ public void run() {
final PartitionIdentifierBatch partitionIdentifierBatch = mongoDBExportPartitionSupplier.apply(exportPartition);

createDataQueryPartitions(
exportPartition.getCollection(), Instant.now(), partitionIdentifierBatch.getPartitionIdentifiers(),
(GlobalState) globalPartition.get());
Instant.now(), partitionIdentifierBatch.getPartitionIdentifiers(), (GlobalState) globalPartition.get());
updateExportPartition(exportPartition, partitionIdentifierBatch);

if (partitionIdentifierBatch.isLastBatch()) {
completeExportPartition(exportPartition);
completeExportPartition(exportPartition, partitionIdentifierBatch.getPartitionIdentifiers().isEmpty());
markTotalPartitionsAsComplete(exportPartition.getCollection());
}
}
Expand All @@ -111,9 +113,7 @@ public void run() {
LOG.warn("Export scheduler interrupted, looks like shutdown has triggered");
}

private boolean createDataQueryPartitions(final String collection,
final Instant exportTime,
final List<PartitionIdentifier> partitionIdentifiers,
private boolean createDataQueryPartitions(final Instant exportTime, final List<PartitionIdentifier> partitionIdentifiers,
final GlobalState globalState) {
AtomicLong totalQueries = new AtomicLong();
partitionIdentifiers.forEach(partitionIdentifier -> {
Expand Down Expand Up @@ -153,11 +153,17 @@ private void updateExportPartition(final ExportPartition exportPartition,
}
}

private void completeExportPartition(final ExportPartition exportPartition) {
private void completeExportPartition(final ExportPartition exportPartition, final boolean emptyPartition) {
exportJobSuccessCounter.increment();
final ExportProgressState state = exportPartition.getProgressState().get();
state.setStatus(COMPLETED_STATUS);
enhancedSourceCoordinator.completePartition(exportPartition);
if (emptyPartition) {
LOG.info("There are no records to export. Streaming can continue...");
final StreamLoadStatus streamLoadStatus = new StreamLoadStatus(Instant.now().toEpochMilli());
enhancedSourceCoordinator.createPartition(
new GlobalState(STREAM_PREFIX + exportPartition.getCollection(), streamLoadStatus.toMap()));
}
}

private void markTotalPartitionsAsComplete(final String collection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.mongo.model.PartitionIdentifierBatch;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -53,6 +55,7 @@
import static org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus.LOADED_RECORDS;
import static org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus.TOTAL_PARTITIONS;
import static org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus.TOTAL_PARTITIONS_COMPLETE;
import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.STREAM_PREFIX;

@ExtendWith(MockitoExtension.class)
public class ExportSchedulerTest {
Expand Down Expand Up @@ -127,33 +130,47 @@ void test_export_run() {
final Instant exportTime = Instant.now();
final String partitionKey = collection + "|" + UUID.randomUUID();

exportPartition = new ExportPartition(collection, partitionSize, exportTime, null);
final ExportProgressState exportProgressState = new ExportProgressState();
exportProgressState.setCollectionName(collection);
exportProgressState.setDatabaseName(collection);
exportProgressState.setExportTime(exportTime.toString());

exportPartition = new ExportPartition(collection, partitionSize, exportTime, exportProgressState);
given(partitionIdentifier.getPartitionKey()).willReturn(partitionKey);
given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(partitionIdentifierBatch);
given(partitionIdentifierBatch.getPartitionIdentifiers()).willReturn(List.of(partitionIdentifier));
given(partitionIdentifierBatch.isLastBatch()).willReturn(true);
given(partitionIdentifierBatch.getEndDocId()).willReturn(UUID.randomUUID().toString());
given(coordinator.getPartition(eq(EXPORT_PREFIX + collection))).willReturn(
Optional.empty(), Optional.of(globalState));
final long totalPartitions = Integer.valueOf(RANDOM.nextInt(10)).longValue();
final long totalPartitions = Integer.valueOf(RANDOM.nextInt(10)).longValue() + 1;
final Instant lastUpdateTimestamp = Instant.now().minus(1, ChronoUnit.MINUTES);
final Map<String, Object> progressState = Map.of(
final Map<String, Object> progressState1 = Map.of(
TOTAL_PARTITIONS, totalPartitions,
LOADED_PARTITIONS, 0L,
LOADED_RECORDS, 0L,
LAST_UPDATE_TIMESTAMP, lastUpdateTimestamp.toEpochMilli(),
TOTAL_PARTITIONS_COMPLETE, false
);
given(globalState.getProgressState()).willReturn(Optional.of(progressState));
final Map<String, Object> progressState2 = Map.of(
TOTAL_PARTITIONS, totalPartitions + 1,
LOADED_PARTITIONS, 0L,
LOADED_RECORDS, 0L,
LAST_UPDATE_TIMESTAMP, lastUpdateTimestamp.toEpochMilli(),
TOTAL_PARTITIONS_COMPLETE, false
);
given(globalState.getProgressState()).willReturn(Optional.of(progressState1)).willReturn(Optional.of(progressState2));
given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition));

ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> future = executorService.submit(() -> exportScheduler.run());
await()
.atMost(Duration.ofMillis(DEFAULT_GET_PARTITION_BACKOFF_MILLIS).plus(Duration.ofSeconds(2)))
.atMost(Duration.ofMillis(DEFAULT_GET_PARTITION_BACKOFF_MILLIS).plus(Duration.ofSeconds(60)))
.untilAsserted(() -> verify(coordinator, times(1)).createPartition(any()));

future.cancel(true);

verify(coordinator, times(2)).getPartition(eq(EXPORT_PREFIX + collection));
verify(coordinator, times(3)).getPartition(eq(EXPORT_PREFIX + collection));

// Acquire the init partition
verify(coordinator).acquireAvailablePartition(eq(ExportPartition.PARTITION_TYPE));
Expand All @@ -173,14 +190,15 @@ void test_export_run() {
assertThat(partitions.get(0).getPartitionType(), equalTo(DataQueryPartition.PARTITION_TYPE));
});

verify(globalState).setProgressState(progressStateCaptor.capture());
verify(globalState, times(2)).setProgressState(progressStateCaptor.capture());
final Map<String, Object> updatedProgressState = progressStateCaptor.getValue();
assertThat(updatedProgressState.get(TOTAL_PARTITIONS), equalTo(totalPartitions + 1));
assertThat(updatedProgressState.get(LOADED_PARTITIONS), is(0L));
assertThat(updatedProgressState.get(LOADED_RECORDS), is(0L));
assertThat((Long) updatedProgressState.get(LAST_UPDATE_TIMESTAMP),
is(greaterThanOrEqualTo(lastUpdateTimestamp.toEpochMilli())));
verify(coordinator).saveProgressStateForPartition(eq(globalState), any());
verify(coordinator).saveProgressStateForPartition(eq(exportPartition), eq(null));
verify(coordinator, times(2)).saveProgressStateForPartition(eq(globalState), any());
verify(exportPartitionTotalCounter).increment(1);
executorService.shutdownNow();
}
Expand All @@ -192,8 +210,13 @@ void test_export_run_multiple_partitions() {
final int partitionSize = new Random().nextInt();
final Instant exportTime = Instant.now();
final String partitionKey = collection + "|" + UUID.randomUUID();

final ExportProgressState exportProgressState = new ExportProgressState();
exportProgressState.setCollectionName(collection);
exportProgressState.setDatabaseName(collection);
exportProgressState.setExportTime(exportTime.toString());

exportPartition = new ExportPartition(collection, partitionSize, exportTime, null);
exportPartition = new ExportPartition(collection, partitionSize, exportTime, exportProgressState);
given(partitionIdentifier.getPartitionKey()).willReturn(partitionKey);
given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(partitionIdentifierBatch);
given(partitionIdentifierBatch.getPartitionIdentifiers()).willReturn(List.of(partitionIdentifier, partitionIdentifier, partitionIdentifier));
Expand Down Expand Up @@ -247,4 +270,52 @@ void test_export_run_multiple_partitions() {
verify(exportPartitionTotalCounter).increment(3);
executorService.shutdownNow();
}

@Test
void test_exportRun_emptyPartitionIdentifier() {
exportScheduler = new ExportScheduler(coordinator, mongoDBExportPartitionSupplier, pluginMetrics);
final String collection = UUID.randomUUID().toString();
final int partitionSize = new Random().nextInt();
final Instant exportTime = Instant.now();
final ExportProgressState exportProgressState = new ExportProgressState();
exportProgressState.setCollectionName(collection);
exportProgressState.setDatabaseName(collection);
exportProgressState.setExportTime(exportTime.toString());
exportPartition = new ExportPartition(collection, partitionSize, exportTime, exportProgressState);
given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(partitionIdentifierBatch);
given(partitionIdentifierBatch.getPartitionIdentifiers()).willReturn(Collections.emptyList());
given(partitionIdentifierBatch.isLastBatch()).willReturn(true);
given(coordinator.getPartition(eq(EXPORT_PREFIX + collection))).willReturn(
Optional.empty(), Optional.of(globalState));
final long totalPartitions = Integer.valueOf(RANDOM.nextInt(10)).longValue();
final Instant lastUpdateTimestamp = Instant.now().minus(1, ChronoUnit.MINUTES);
final Map<String, Object> progressState = Map.of(
TOTAL_PARTITIONS, totalPartitions,
LOADED_PARTITIONS, 0L,
LOADED_RECORDS, 0L,
LAST_UPDATE_TIMESTAMP, lastUpdateTimestamp.toEpochMilli(),
TOTAL_PARTITIONS_COMPLETE, false
);
given(globalState.getProgressState()).willReturn(Optional.of(progressState));
given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition));

ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> future = executorService.submit(() -> exportScheduler.run());
await()
.atMost(Duration.ofMillis(DEFAULT_GET_PARTITION_BACKOFF_MILLIS).plus(Duration.ofSeconds(2)))
.untilAsserted(() -> verify(coordinator).completePartition(exportPartition));

future.cancel(true);

verify(coordinator, times(3)).getPartition(eq(EXPORT_PREFIX + collection));

// Acquire the init partition
verify(coordinator).acquireAvailablePartition(eq(ExportPartition.PARTITION_TYPE));
final ArgumentCaptor<GlobalState> argumentCaptor = ArgumentCaptor.forClass(GlobalState.class);
verify(coordinator).createPartition(argumentCaptor.capture());
final GlobalState streamGlobalState = argumentCaptor.getValue();
assertThat(streamGlobalState.getPartitionKey(), is(STREAM_PREFIX + exportPartition.getCollection()));
verify(coordinator).saveProgressStateForPartition(eq(globalState), eq(null));
executorService.shutdownNow();
}
}

0 comments on commit 6cf4386

Please sign in to comment.