Skip to content

Commit

Permalink
Add _id as additional sort key for point-in-time and search_after (#3374
Browse files Browse the repository at this point in the history
)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Sep 26, 2023
1 parent 5fdf95f commit 3689b12
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,40 @@

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SearchConfiguration {

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(SearchConfiguration.class);

@JsonProperty("search_context_type")
private SearchContextType searchContextType;
private String searchContextType;

@JsonProperty("batch_size")
private Integer batchSize = 1000;

@JsonIgnore
private SearchContextType searchContextTypeValue;

public SearchContextType getSearchContextType() {
return searchContextType;
return searchContextTypeValue;
}

public Integer getBatchSize() {
return batchSize;
}

@AssertTrue(message = "search_context_type must be one of [ 'scroll', 'point_in_time', 'none' ]")
boolean isSearchContextTypeValid() {
try {
if (searchContextType != null) {
searchContextTypeValue = SearchContextType.valueOf(searchContextType.toUpperCase());
}

return true;
} catch (final IllegalArgumentException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.FieldSort;
import co.elastic.clients.elasticsearch._types.ScoreSort;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
Expand Down Expand Up @@ -111,7 +112,10 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
.id(searchPointInTimeRequest.getPitId())
.keepAlive(Time.of(time -> time.time(searchPointInTimeRequest.getKeepAlive())))))
.size(searchPointInTimeRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
Expand Down Expand Up @@ -147,6 +151,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
try {
searchResponse = elasticsearchClient.search(SearchRequest.of(request -> request
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.size(createScrollRequest.getSize())
.index(createScrollRequest.getIndex())), ObjectNode.class);
} catch (final ElasticsearchException e) {
Expand Down Expand Up @@ -214,7 +219,10 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
builder
.index(noSearchContextSearchRequest.getIndex())
.size(noSearchContextSearchRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldSort;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch._types.ScoreSort;
import org.opensearch.client.opensearch._types.SortOptions;
Expand Down Expand Up @@ -111,8 +112,11 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
builder
.pit(Pit.of(pitBuilder -> pitBuilder.id(searchPointInTimeRequest.getPitId()).keepAlive(searchPointInTimeRequest.getKeepAlive())))
.size(searchPointInTimeRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery.queryName("*")))));
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
builder.searchAfter(searchPointInTimeRequest.getSearchAfter());
Expand Down Expand Up @@ -145,6 +149,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
try {
searchResponse = openSearchClient.search(SearchRequest.of(request -> request
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.size(createScrollRequest.getSize())
.index(createScrollRequest.getIndex())), ObjectNode.class);
} catch (final OpenSearchException e) {
Expand Down Expand Up @@ -211,8 +216,11 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
builder
.index(noSearchContextSearchRequest.getIndex())
.size(noSearchContextSearchRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery.queryName("*")))));
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
builder.searchAfter(noSearchContextSearchRequest.getSearchAfter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,13 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model;

import com.fasterxml.jackson.annotation.JsonValue;

public enum SearchContextType {
SCROLL("scroll"),
POINT_IN_TIME("point_in_time"),
NONE("none");

private final String searchContextType;

SearchContextType(final String searchContextType) {
this.searchContextType = searchContextType;
}

@JsonValue
public String getSearchContextType() {
return searchContextType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;

public class SearchConfigurationTest {
Expand All @@ -25,24 +27,33 @@ void default_search_configuration() {
final SearchConfiguration searchConfiguration = new SearchConfiguration();

assertThat(searchConfiguration.getBatchSize(), equalTo(1000));
assertThat(searchConfiguration.getSearchContextType(), nullValue());
}

@Test
void non_default_search_configuration() {
final Map<String, Object> pluginSettings = new HashMap<>();
pluginSettings.put("batch_size", 2000);
pluginSettings.put("search_context_type", "scroll");

final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class);
assertThat(searchConfiguration.getBatchSize(),equalTo(2000));
assertThat(searchConfiguration.isSearchContextTypeValid(), equalTo(true));
assertThat(searchConfiguration.getSearchContextType(), equalTo(SearchContextType.SCROLL));
}

@Test
void query_is_not_valid_json_string() {
void search_context_type_invalid() {

final Map<String, Object> pluginSettings = new HashMap<>();
pluginSettings.put("batch_size", 1000);
pluginSettings.put("search_context_type", "invalid");


final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class);
assertThat(searchConfiguration.getBatchSize(),equalTo(1000));

assertThat(searchConfiguration.isSearchContextTypeValid(), equalTo(false));
assertThat(searchConfiguration.getSearchContextType(), nullValue());
}
}

0 comments on commit 3689b12

Please sign in to comment.