From 84127e35cfa27e5b4a39322d4776707d481291e5 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 8 Sep 2023 14:47:53 -0500 Subject: [PATCH] Fix issue where the source coordinator would skip creating partitions for new items for the os source Signed-off-by: Taylor Gray --- .../sourcecoordination/LeaseBasedSourceCoordinator.java | 2 +- .../sourcecoordination/LeaseBasedSourceCoordinatorTest.java | 6 ++++-- .../plugins/source/opensearch/OpenSearchSource.java | 6 +++++- .../worker/OpenSearchIndexPartitionCreationSupplier.java | 4 ++++ 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java index a5eced3dcd..c920cf59a6 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -183,7 +183,7 @@ private void createPartitions(final List partitionIdentifie final Optional optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey()); if (optionalPartitionItem.isPresent()) { - return; + continue; } final boolean partitionCreated = sourceCoordinationStore.tryCreatePartitionItem( diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java index 48986f337b..1bcb39d7e7 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java @@ -188,15 +188,17 @@ void getNextPartition_throws_UninitializedSourceCoordinatorException_when_called } @Test - void getNextPartition_calls_supplier_and_creates_partition_with_non_existing_item_when_partition_exists_and_is_created_successfully() { + void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_non_existing_item_when_partition_exists_and_is_created_successfully() { final PartitionIdentifier partitionIdentifier = PartitionIdentifier.builder().withPartitionKey(UUID.randomUUID().toString()).build(); - final Function, List> partitionCreationSupplier = (map) -> List.of(partitionIdentifier); + final PartitionIdentifier partitionIdentifierToSkip = PartitionIdentifier.builder().withPartitionKey(UUID.randomUUID().toString()).build(); + final Function, List> partitionCreationSupplier = (map) -> List.of(partitionIdentifierToSkip, partitionIdentifier); given(sourceCoordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any())).willReturn(Optional.empty()).willReturn( Optional.empty()); given(globalStateForPartitionCreationItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.UNASSIGNED); given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(null); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem)); doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem); + given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty()); given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java index 681e22b075..aa03d93c94 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -20,6 +20,8 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy; +import java.util.Objects; + @DataPrepperPlugin(name="opensearch", pluginType = Source.class, pluginConfigurationType = OpenSearchSourceConfiguration.class) public class OpenSearchSource implements Source>, UsesSourceCoordination { @@ -72,7 +74,9 @@ public boolean areAcknowledgementsEnabled() { @Override public void stop() { - openSearchService.stop(); + if (Objects.nonNull(openSearchService)) { + openSearchService.stop(); + } } @Override diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java index 6287da71a4..4960adc8a8 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java @@ -76,6 +76,8 @@ private List applyForOpenSearchClient(final Map shouldIndexBeProcessed(osIndicesRecord.index())) .map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build()) @@ -91,6 +93,8 @@ private List applyForElasticSearchClient(final Map shouldIndexBeProcessed(esIndicesRecord.index())) .map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())