Skip to content

Commit

Permalink
Fix issue where the source coordinator would skip creating partitions…
Browse files Browse the repository at this point in the history
… for new items for the os source

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Sep 8, 2023
1 parent a63a83c commit 84127e3
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey());

if (optionalPartitionItem.isPresent()) {
return;
continue;
}

final boolean partitionCreated = sourceCoordinationStore.tryCreatePartitionItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,17 @@ void getNextPartition_throws_UninitializedSourceCoordinatorException_when_called
}

@Test
void getNextPartition_calls_supplier_and_creates_partition_with_non_existing_item_when_partition_exists_and_is_created_successfully() {
void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_non_existing_item_when_partition_exists_and_is_created_successfully() {
final PartitionIdentifier partitionIdentifier = PartitionIdentifier.builder().withPartitionKey(UUID.randomUUID().toString()).build();
final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = (map) -> List.of(partitionIdentifier);
final PartitionIdentifier partitionIdentifierToSkip = PartitionIdentifier.builder().withPartitionKey(UUID.randomUUID().toString()).build();
final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = (map) -> List.of(partitionIdentifierToSkip, partitionIdentifier);

given(sourceCoordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any())).willReturn(Optional.empty()).willReturn( Optional.empty());
given(globalStateForPartitionCreationItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.UNASSIGNED);
given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(null);
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem));
doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem);
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem));
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

import java.util.Objects;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class, pluginConfigurationType = OpenSearchSourceConfiguration.class)
public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordination {

Expand Down Expand Up @@ -72,7 +74,9 @@ public boolean areAcknowledgementsEnabled() {

@Override
public void stop() {
openSearchService.stop();
if (Objects.nonNull(openSearchService)) {
openSearchService.stop();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Obj
return Collections.emptyList();
}

LOG.debug("Found {} indices", indicesResponse.valueBody().size());

return indicesResponse.valueBody().stream()
.filter(osIndicesRecord -> shouldIndexBeProcessed(osIndicesRecord.index()))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
Expand All @@ -91,6 +93,8 @@ private List<PartitionIdentifier> applyForElasticSearchClient(final Map<String,
return Collections.emptyList();
}

LOG.debug("Found {} indices", indicesResponse.valueBody().size());

return indicesResponse.valueBody().stream()
.filter(esIndicesRecord -> shouldIndexBeProcessed(esIndicesRecord.index()))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
Expand Down

0 comments on commit 84127e3

Please sign in to comment.