diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index f40f1ce0b0..6e4fc6b919 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -216,7 +216,7 @@ With the `document_root_key` set to `status`. The document structure would be `{ * `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). * `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). * `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin. -* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. +* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`. ## Metrics ### Management Disabled Index Type diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index fa633d60d3..2aca8b0275 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; +import org.apache.http.HttpStatus; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.OpenSearchException; @@ -39,6 +40,7 @@ public abstract class AbstractIndexManager implements IndexManager { = "Invalid alias name [%s], an index exists with the same name as the alias"; public static final String INVALID_INDEX_ALIAS_ERROR = "invalid_index_name_exception"; + static final Set NO_ISM_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST); private static final String TIME_PATTERN_STARTING_SYMBOLS = "%{"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); protected RestHighLevelClient restHighLevelClient; @@ -178,9 +180,16 @@ final boolean checkISMEnabled() throws IOException { final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder() .includeDefaults(true) .build(); - final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request); - final String enabled = getISMEnabled(response); - return enabled != null && enabled.equals("true"); + try { + final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request); + final String enabled = getISMEnabled(response); + return enabled != null && enabled.equals("true"); + } catch (OpenSearchException ex) { + if (NO_ISM_HTTP_STATUS.contains(ex.status())) { + return false; + } + throw ex; + } } private String getISMEnabled(final GetClusterSettingsResponse response) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index ccc2e1c951..b4aed22211 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -130,11 +130,6 @@ private IndexConfiguration(final Builder builder) { this.documentRootKey = builder.documentRootKey; } - private void determineTemplateType(Builder builder) { - this.templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 : - (builder.templateType != null ? builder.templateType : TemplateType.V1); - } - private void determineIndexType(Builder builder) { if(builder.indexType != null) { Optional mappedIndexType = IndexType.getByValue(builder.indexType); @@ -148,6 +143,15 @@ private void determineIndexType(Builder builder) { } } + private void determineTemplateType(Builder builder) { + if (builder.serverless) { + templateType = TemplateType.INDEX_TEMPLATE; + } else { + templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 : + (builder.templateType != null ? builder.templateType : TemplateType.V1); + } + } + public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) { IndexConfiguration.Builder builder = new IndexConfiguration.Builder(); final String indexAlias = pluginSetting.getStringOrDefault(INDEX_ALIAS, null); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java index 7099c7c924..6522a93e98 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java @@ -5,9 +5,12 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import org.apache.http.HttpStatus; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.opensearch.client.ResponseException; @@ -336,6 +339,21 @@ void checkISMEnabled_False() throws IOException { verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class)); } + @ParameterizedTest + @ValueSource(ints = {HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST}) + void checkISMEnabled_FalseWhenOpenSearchExceptionStatusNonRetryable(final int statusCode) throws IOException { + defaultIndexManager = indexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); + when(openSearchException.status()).thenReturn(statusCode); + when(openSearchClusterClient.getSettings(any(GetClusterSettingsRequest.class))).thenThrow(openSearchException); + assertEquals(false, defaultIndexManager.checkISMEnabled()); + verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); + verify(indexConfiguration).getIsmPolicyFile(); + verify(indexConfiguration).getIndexAlias(); + verify(openSearchClient).cluster(); + verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class)); + } + @Test void checkAndCreatePolicy_Normal() throws IOException { defaultIndexManager = indexManagerFactory.getIndexManager( diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index 64810bf1f2..30f811366a 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -365,6 +365,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() { final Map metadata = initializeConfigMetaData( null, testIndexAlias, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); + metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.MANAGEMENT_DISABLED, indexConfiguration.getIndexType()); @@ -377,10 +378,12 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() { final Map metadata = initializeConfigMetaData( IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); + metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); + assertEquals(TemplateType.INDEX_TEMPLATE, indexConfiguration.getTemplateType()); assertEquals(true, indexConfiguration.getServerless()); }