Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support index template for serverless #3071

Merged
merged 4 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ With the `document_root_key` set to `status`. The document structure would be `{
* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`.

## Metrics
### Management Disabled Index Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.http.HttpStatus;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
Expand Down Expand Up @@ -39,6 +40,7 @@ public abstract class AbstractIndexManager implements IndexManager {
= "Invalid alias name [%s], an index exists with the same name as the alias";
public static final String INVALID_INDEX_ALIAS_ERROR
= "invalid_index_name_exception";
static final Set<Integer> NO_ISM_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST);
private static final String TIME_PATTERN_STARTING_SYMBOLS = "%{";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected RestHighLevelClient restHighLevelClient;
Expand Down Expand Up @@ -178,9 +180,16 @@ final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
.build();
final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request);
final String enabled = getISMEnabled(response);
return enabled != null && enabled.equals("true");
try {
final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request);
final String enabled = getISMEnabled(response);
return enabled != null && enabled.equals("true");
} catch (OpenSearchException ex) {
if (NO_ISM_HTTP_STATUS.contains(ex.status())) {
return false;
}
throw ex;
}
}

private String getISMEnabled(final GetClusterSettingsResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@ private IndexConfiguration(final Builder builder) {
this.documentRootKey = builder.documentRootKey;
}

private void determineTemplateType(Builder builder) {
this.templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 :
(builder.templateType != null ? builder.templateType : TemplateType.V1);
}

private void determineIndexType(Builder builder) {
if(builder.indexType != null) {
Optional<IndexType> mappedIndexType = IndexType.getByValue(builder.indexType);
Expand All @@ -148,6 +143,15 @@ private void determineIndexType(Builder builder) {
}
}

private void determineTemplateType(Builder builder) {
if (builder.serverless) {
templateType = TemplateType.INDEX_TEMPLATE;
} else {
templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 :
(builder.templateType != null ? builder.templateType : TemplateType.V1);
}
}

public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) {
IndexConfiguration.Builder builder = new IndexConfiguration.Builder();
final String indexAlias = pluginSetting.getStringOrDefault(INDEX_ALIAS, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.apache.http.HttpStatus;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.opensearch.client.ResponseException;
Expand Down Expand Up @@ -336,6 +339,21 @@ void checkISMEnabled_False() throws IOException {
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@ParameterizedTest
@ValueSource(ints = {HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST})
void checkISMEnabled_FalseWhenOpenSearchExceptionStatusNonRetryable(final int statusCode) throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchException.status()).thenReturn(statusCode);
when(openSearchClusterClient.getSettings(any(GetClusterSettingsRequest.class))).thenThrow(openSearchException);
assertEquals(false, defaultIndexManager.checkISMEnabled());
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkAndCreatePolicy_Normal() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, testIndexAlias, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.MANAGEMENT_DISABLED, indexConfiguration.getIndexType());
Expand All @@ -377,10 +378,12 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
assertEquals(testIndexAlias, indexConfiguration.getIndexAlias());
assertEquals(TemplateType.INDEX_TEMPLATE, indexConfiguration.getTemplateType());
assertEquals(true, indexConfiguration.getServerless());
}

Expand Down
Loading