Skip to content

Commit

Permalink
ENH: support index template for serverless (opensearch-project#3071)
Browse files Browse the repository at this point in the history
* ENH: support index template for serverless

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Oct 9, 2023
1 parent 0d2e491 commit 73a80a1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 9 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ if `exclude_keys` is set to ["message", "status"], the document written to OpenS
* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`.

## Metrics
### Management Disabled Index Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.http.HttpStatus;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
Expand Down Expand Up @@ -39,6 +40,7 @@ public abstract class AbstractIndexManager implements IndexManager {
= "Invalid alias name [%s], an index exists with the same name as the alias";
public static final String INVALID_INDEX_ALIAS_ERROR
= "invalid_index_name_exception";
static final Set<Integer> NO_ISM_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST);
private static final String TIME_PATTERN_STARTING_SYMBOLS = "%{";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected RestHighLevelClient restHighLevelClient;
Expand Down Expand Up @@ -178,9 +180,16 @@ final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
.build();
final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request);
final String enabled = getISMEnabled(response);
return enabled != null && enabled.equals("true");
try {
final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request);
final String enabled = getISMEnabled(response);
return enabled != null && enabled.equals("true");
} catch (OpenSearchException ex) {
if (NO_ISM_HTTP_STATUS.contains(ex.status())) {
return false;
}
throw ex;
}
}

private String getISMEnabled(final GetClusterSettingsResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ private IndexConfiguration(final Builder builder) {
this.documentRootKey = builder.documentRootKey;
}

private void determineTemplateType(Builder builder) {
this.templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 :
(builder.templateType != null ? builder.templateType : TemplateType.V1);
}

private void determineIndexType(Builder builder) {
if(builder.indexType != null) {
Optional<IndexType> mappedIndexType = IndexType.getByValue(builder.indexType);
Expand All @@ -167,6 +162,15 @@ private void determineIndexType(Builder builder) {
}
}

private void determineTemplateType(Builder builder) {
if (builder.serverless) {
templateType = TemplateType.INDEX_TEMPLATE;
} else {
templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 :
(builder.templateType != null ? builder.templateType : TemplateType.V1);
}
}

public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) {
return readIndexConfig(pluginSetting, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.apache.http.HttpStatus;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.opensearch.client.ResponseException;
Expand Down Expand Up @@ -336,6 +339,21 @@ void checkISMEnabled_False() throws IOException {
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@ParameterizedTest
@ValueSource(ints = {HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST})
void checkISMEnabled_FalseWhenOpenSearchExceptionStatusNonRetryable(final int statusCode) throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchException.status()).thenReturn(statusCode);
when(openSearchClusterClient.getSettings(any(GetClusterSettingsRequest.class))).thenThrow(openSearchException);
assertEquals(false, defaultIndexManager.checkISMEnabled());
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkAndCreatePolicy_Normal() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, testIndexAlias, null, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.MANAGEMENT_DISABLED, indexConfiguration.getIndexType());
Expand All @@ -421,10 +422,12 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
assertEquals(testIndexAlias, indexConfiguration.getIndexAlias());
assertEquals(TemplateType.INDEX_TEMPLATE, indexConfiguration.getTemplateType());
assertEquals(true, indexConfiguration.getServerless());
}

Expand Down

0 comments on commit 73a80a1

Please sign in to comment.