diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java index e6b4a728f4..2b5c6b885b 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java @@ -23,10 +23,13 @@ import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -79,7 +82,7 @@ public void setup() { "test.collection|0|1|java.lang.Long", "test.collection|000000000000000000000000|000000000000000000000001|org.bson.types.ObjectId" }) - public void testProcessPartitionSuccess(final String partitionKey) throws Exception { + public void testProcessPartitionSuccess(final String partitionKey) { when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE)).thenReturn(Optional.of(dataQueryPartition)); final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -106,7 +109,7 @@ public void testProcessPartitionSuccess(final String partitionKey) throws Except lenient().when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE)) .thenReturn(Optional.of(dataQueryPartition)); - executorService.submit(() -> { + final Future future = executorService.submit(() -> { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) .thenReturn(mongoClient); @@ -114,13 +117,21 @@ public void testProcessPartitionSuccess(final String partitionKey) throws Except } }); - Thread.sleep(100); - executorService.shutdownNow(); - // Then dependencies are called - verify(mongoClient).getDatabase(eq("test")); + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(mongoClient).getDatabase(eq("test"))); + + future.cancel(true); + + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(mockPartitionCheckpoint, times(2)).checkpoint(2)); + verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); + verify(mockRecordBufferWriter).writeToBuffer(eq(mockAcknowledgementSet), any()); verify(successItemsCounter, times(2)).increment(); verify(failureItemsCounter, never()).increment(); + executorService.shutdownNow(); } } \ No newline at end of file