Skip to content

Commit

Permalink
Fix issue of skipping new partitions/indices for the opensearch source (
Browse files Browse the repository at this point in the history
#3319)

Fix issue where the source coordinator would skip creating partitions for new items for the os source

Signed-off-by: Taylor Gray <[email protected]>

---------

Signed-off-by: Taylor Gray <[email protected]>
(cherry picked from commit 778e9c7)
  • Loading branch information
graytaylor0 authored and github-actions[bot] committed Sep 25, 2023
1 parent a3f1eeb commit b6da32d
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey());

if (optionalPartitionItem.isPresent()) {
return;
continue;
}

final boolean partitionCreated = sourceCoordinationStore.tryCreatePartitionItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,17 @@ void getNextPartition_throws_UninitializedSourceCoordinatorException_when_called
}

@Test
void getNextPartition_calls_supplier_and_creates_partition_with_non_existing_item_when_partition_exists_and_is_created_successfully() {
void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_non_existing_item_when_partition_exists_and_is_created_successfully() {
final PartitionIdentifier partitionIdentifier = PartitionIdentifier.builder().withPartitionKey(UUID.randomUUID().toString()).build();
final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = (map) -> List.of(partitionIdentifier);
final PartitionIdentifier partitionIdentifierToSkip = PartitionIdentifier.builder().withPartitionKey(UUID.randomUUID().toString()).build();
final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = (map) -> List.of(partitionIdentifierToSkip, partitionIdentifier);

given(sourceCoordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any())).willReturn(Optional.empty()).willReturn( Optional.empty());
given(globalStateForPartitionCreationItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.UNASSIGNED);
given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(null);
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem));
doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem);
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem));
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

import java.util.Objects;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class, pluginConfigurationType = OpenSearchSourceConfiguration.class)
public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordination {

Expand Down Expand Up @@ -56,7 +58,9 @@ private void startProcess(final OpenSearchSourceConfiguration openSearchSourceCo

@Override
public void stop() {
openSearchService.stop();
if (Objects.nonNull(openSearchService)) {
openSearchService.stop();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Obj
return Collections.emptyList();
}

LOG.debug("Found {} indices", indicesResponse.valueBody().size());

return indicesResponse.valueBody().stream()
.filter(osIndicesRecord -> shouldIndexBeProcessed(osIndicesRecord.index()))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
Expand All @@ -91,6 +93,8 @@ private List<PartitionIdentifier> applyForElasticSearchClient(final Map<String,
return Collections.emptyList();
}

LOG.debug("Found {} indices", indicesResponse.valueBody().size());

return indicesResponse.valueBody().stream()
.filter(esIndicesRecord -> shouldIndexBeProcessed(esIndicesRecord.index()))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
final LocalDateTime startDateTime,
final LocalDateTime endDateTime,
final Map<String, Object> globalStateMap) {

Instant mostRecentLastModifiedTimestamp = globalStateMap.get(bucket) != null ? Instant.parse((String) globalStateMap.get(bucket)) : null;
final List<PartitionIdentifier> allPartitionIdentifiers = new ArrayList<>();
ListObjectsV2Response listObjectsV2Response = null;
Expand Down

0 comments on commit b6da32d

Please sign in to comment.