diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java index f5de6b2e2f..7358a1f8e4 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.plugins.mongo.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus; import org.opensearch.dataprepper.plugins.mongo.model.PartitionIdentifierBatch; +import org.opensearch.dataprepper.plugins.mongo.model.StreamLoadStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +27,8 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.STREAM_PREFIX; + public class ExportScheduler implements Runnable { public static final String EXPORT_PREFIX = "EXPORT-"; private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class); @@ -83,12 +86,11 @@ public void run() { final PartitionIdentifierBatch partitionIdentifierBatch = mongoDBExportPartitionSupplier.apply(exportPartition); createDataQueryPartitions( - exportPartition.getCollection(), Instant.now(), partitionIdentifierBatch.getPartitionIdentifiers(), - (GlobalState) globalPartition.get()); + Instant.now(), partitionIdentifierBatch.getPartitionIdentifiers(), (GlobalState) globalPartition.get()); updateExportPartition(exportPartition, partitionIdentifierBatch); if (partitionIdentifierBatch.isLastBatch()) { - completeExportPartition(exportPartition); + completeExportPartition(exportPartition, partitionIdentifierBatch.getPartitionIdentifiers().isEmpty()); markTotalPartitionsAsComplete(exportPartition.getCollection()); } } @@ -111,9 +113,7 @@ public void run() { LOG.warn("Export scheduler interrupted, looks like shutdown has triggered"); } - private boolean createDataQueryPartitions(final String collection, - final Instant exportTime, - final List partitionIdentifiers, + private boolean createDataQueryPartitions(final Instant exportTime, final List partitionIdentifiers, final GlobalState globalState) { AtomicLong totalQueries = new AtomicLong(); partitionIdentifiers.forEach(partitionIdentifier -> { @@ -153,11 +153,17 @@ private void updateExportPartition(final ExportPartition exportPartition, } } - private void completeExportPartition(final ExportPartition exportPartition) { + private void completeExportPartition(final ExportPartition exportPartition, final boolean emptyPartition) { exportJobSuccessCounter.increment(); final ExportProgressState state = exportPartition.getProgressState().get(); state.setStatus(COMPLETED_STATUS); enhancedSourceCoordinator.completePartition(exportPartition); + if (emptyPartition) { + LOG.info("There are no records to export. Streaming can continue..."); + final StreamLoadStatus streamLoadStatus = new StreamLoadStatus(Instant.now().toEpochMilli()); + enhancedSourceCoordinator.createPartition( + new GlobalState(STREAM_PREFIX + exportPartition.getCollection(), streamLoadStatus.toMap())); + } } private void markTotalPartitionsAsComplete(final String collection) { diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java index 35c27bfae4..b01a4e5f8c 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java @@ -15,11 +15,13 @@ import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.mongo.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.mongo.model.PartitionIdentifierBatch; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -53,6 +55,7 @@ import static org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus.LOADED_RECORDS; import static org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus.TOTAL_PARTITIONS; import static org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus.TOTAL_PARTITIONS_COMPLETE; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamWorker.STREAM_PREFIX; @ExtendWith(MockitoExtension.class) public class ExportSchedulerTest { @@ -127,33 +130,47 @@ void test_export_run() { final Instant exportTime = Instant.now(); final String partitionKey = collection + "|" + UUID.randomUUID(); - exportPartition = new ExportPartition(collection, partitionSize, exportTime, null); + final ExportProgressState exportProgressState = new ExportProgressState(); + exportProgressState.setCollectionName(collection); + exportProgressState.setDatabaseName(collection); + exportProgressState.setExportTime(exportTime.toString()); + + exportPartition = new ExportPartition(collection, partitionSize, exportTime, exportProgressState); given(partitionIdentifier.getPartitionKey()).willReturn(partitionKey); given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(partitionIdentifierBatch); given(partitionIdentifierBatch.getPartitionIdentifiers()).willReturn(List.of(partitionIdentifier)); + given(partitionIdentifierBatch.isLastBatch()).willReturn(true); + given(partitionIdentifierBatch.getEndDocId()).willReturn(UUID.randomUUID().toString()); given(coordinator.getPartition(eq(EXPORT_PREFIX + collection))).willReturn( Optional.empty(), Optional.of(globalState)); - final long totalPartitions = Integer.valueOf(RANDOM.nextInt(10)).longValue(); + final long totalPartitions = Integer.valueOf(RANDOM.nextInt(10)).longValue() + 1; final Instant lastUpdateTimestamp = Instant.now().minus(1, ChronoUnit.MINUTES); - final Map progressState = Map.of( + final Map progressState1 = Map.of( TOTAL_PARTITIONS, totalPartitions, LOADED_PARTITIONS, 0L, LOADED_RECORDS, 0L, LAST_UPDATE_TIMESTAMP, lastUpdateTimestamp.toEpochMilli(), TOTAL_PARTITIONS_COMPLETE, false ); - given(globalState.getProgressState()).willReturn(Optional.of(progressState)); + final Map progressState2 = Map.of( + TOTAL_PARTITIONS, totalPartitions + 1, + LOADED_PARTITIONS, 0L, + LOADED_RECORDS, 0L, + LAST_UPDATE_TIMESTAMP, lastUpdateTimestamp.toEpochMilli(), + TOTAL_PARTITIONS_COMPLETE, false + ); + given(globalState.getProgressState()).willReturn(Optional.of(progressState1)).willReturn(Optional.of(progressState2)); given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)); ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> exportScheduler.run()); await() - .atMost(Duration.ofMillis(DEFAULT_GET_PARTITION_BACKOFF_MILLIS).plus(Duration.ofSeconds(2))) + .atMost(Duration.ofMillis(DEFAULT_GET_PARTITION_BACKOFF_MILLIS).plus(Duration.ofSeconds(60))) .untilAsserted(() -> verify(coordinator, times(1)).createPartition(any())); future.cancel(true); - verify(coordinator, times(2)).getPartition(eq(EXPORT_PREFIX + collection)); + verify(coordinator, times(3)).getPartition(eq(EXPORT_PREFIX + collection)); // Acquire the init partition verify(coordinator).acquireAvailablePartition(eq(ExportPartition.PARTITION_TYPE)); @@ -173,14 +190,15 @@ void test_export_run() { assertThat(partitions.get(0).getPartitionType(), equalTo(DataQueryPartition.PARTITION_TYPE)); }); - verify(globalState).setProgressState(progressStateCaptor.capture()); + verify(globalState, times(2)).setProgressState(progressStateCaptor.capture()); final Map updatedProgressState = progressStateCaptor.getValue(); assertThat(updatedProgressState.get(TOTAL_PARTITIONS), equalTo(totalPartitions + 1)); assertThat(updatedProgressState.get(LOADED_PARTITIONS), is(0L)); assertThat(updatedProgressState.get(LOADED_RECORDS), is(0L)); assertThat((Long) updatedProgressState.get(LAST_UPDATE_TIMESTAMP), is(greaterThanOrEqualTo(lastUpdateTimestamp.toEpochMilli()))); - verify(coordinator).saveProgressStateForPartition(eq(globalState), any()); + verify(coordinator).saveProgressStateForPartition(eq(exportPartition), eq(null)); + verify(coordinator, times(2)).saveProgressStateForPartition(eq(globalState), any()); verify(exportPartitionTotalCounter).increment(1); executorService.shutdownNow(); } @@ -192,8 +210,13 @@ void test_export_run_multiple_partitions() { final int partitionSize = new Random().nextInt(); final Instant exportTime = Instant.now(); final String partitionKey = collection + "|" + UUID.randomUUID(); + + final ExportProgressState exportProgressState = new ExportProgressState(); + exportProgressState.setCollectionName(collection); + exportProgressState.setDatabaseName(collection); + exportProgressState.setExportTime(exportTime.toString()); - exportPartition = new ExportPartition(collection, partitionSize, exportTime, null); + exportPartition = new ExportPartition(collection, partitionSize, exportTime, exportProgressState); given(partitionIdentifier.getPartitionKey()).willReturn(partitionKey); given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(partitionIdentifierBatch); given(partitionIdentifierBatch.getPartitionIdentifiers()).willReturn(List.of(partitionIdentifier, partitionIdentifier, partitionIdentifier)); @@ -247,4 +270,52 @@ void test_export_run_multiple_partitions() { verify(exportPartitionTotalCounter).increment(3); executorService.shutdownNow(); } + + @Test + void test_exportRun_emptyPartitionIdentifier() { + exportScheduler = new ExportScheduler(coordinator, mongoDBExportPartitionSupplier, pluginMetrics); + final String collection = UUID.randomUUID().toString(); + final int partitionSize = new Random().nextInt(); + final Instant exportTime = Instant.now(); + final ExportProgressState exportProgressState = new ExportProgressState(); + exportProgressState.setCollectionName(collection); + exportProgressState.setDatabaseName(collection); + exportProgressState.setExportTime(exportTime.toString()); + exportPartition = new ExportPartition(collection, partitionSize, exportTime, exportProgressState); + given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(partitionIdentifierBatch); + given(partitionIdentifierBatch.getPartitionIdentifiers()).willReturn(Collections.emptyList()); + given(partitionIdentifierBatch.isLastBatch()).willReturn(true); + given(coordinator.getPartition(eq(EXPORT_PREFIX + collection))).willReturn( + Optional.empty(), Optional.of(globalState)); + final long totalPartitions = Integer.valueOf(RANDOM.nextInt(10)).longValue(); + final Instant lastUpdateTimestamp = Instant.now().minus(1, ChronoUnit.MINUTES); + final Map progressState = Map.of( + TOTAL_PARTITIONS, totalPartitions, + LOADED_PARTITIONS, 0L, + LOADED_RECORDS, 0L, + LAST_UPDATE_TIMESTAMP, lastUpdateTimestamp.toEpochMilli(), + TOTAL_PARTITIONS_COMPLETE, false + ); + given(globalState.getProgressState()).willReturn(Optional.of(progressState)); + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> exportScheduler.run()); + await() + .atMost(Duration.ofMillis(DEFAULT_GET_PARTITION_BACKOFF_MILLIS).plus(Duration.ofSeconds(2))) + .untilAsserted(() -> verify(coordinator).completePartition(exportPartition)); + + future.cancel(true); + + verify(coordinator, times(3)).getPartition(eq(EXPORT_PREFIX + collection)); + + // Acquire the init partition + verify(coordinator).acquireAvailablePartition(eq(ExportPartition.PARTITION_TYPE)); + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(GlobalState.class); + verify(coordinator).createPartition(argumentCaptor.capture()); + final GlobalState streamGlobalState = argumentCaptor.getValue(); + assertThat(streamGlobalState.getPartitionKey(), is(STREAM_PREFIX + exportPartition.getCollection())); + verify(coordinator).saveProgressStateForPartition(eq(globalState), eq(null)); + executorService.shutdownNow(); + } }