From 19a965089e3dc7e4eac675b5ce3a0a5a0fd68e26 Mon Sep 17 00:00:00 2001 From: George Chen Date: Wed, 26 Jul 2023 16:04:24 -0500 Subject: [PATCH 1/3] ENH: support index template for serverless Signed-off-by: George Chen --- data-prepper-plugins/opensearch/README.md | 2 +- .../opensearch/index/AbstractIndexManager.java | 15 ++++++++++++--- .../opensearch/index/IndexConfiguration.java | 12 ++++++++++-- .../index/DefaultIndexManagerTests.java | 18 ++++++++++++++++++ .../index/IndexConfigurationTests.java | 3 +++ 5 files changed, 44 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 66c75f6f57..7a6c1c3e73 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -216,7 +216,7 @@ With the `document_root_key` set to `status`. The document structure would be `{ * `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). * `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). * `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin. -* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. +* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`. ## Metrics ### Management Disabled Index Type diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index fa633d60d3..2bb49258de 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; +import org.apache.http.HttpStatus; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.OpenSearchException; @@ -39,6 +40,7 @@ public abstract class AbstractIndexManager implements IndexManager { = "Invalid alias name [%s], an index exists with the same name as the alias"; public static final String INVALID_INDEX_ALIAS_ERROR = "invalid_index_name_exception"; + static final Set NON_RETRYABLE_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST); private static final String TIME_PATTERN_STARTING_SYMBOLS = "%{"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); protected RestHighLevelClient restHighLevelClient; @@ -178,9 +180,16 @@ final boolean checkISMEnabled() throws IOException { final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder() .includeDefaults(true) .build(); - final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request); - final String enabled = getISMEnabled(response); - return enabled != null && enabled.equals("true"); + try { + final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request); + final String enabled = getISMEnabled(response); + return enabled != null && enabled.equals("true"); + } catch (OpenSearchException ex) { + if (NON_RETRYABLE_HTTP_STATUS.contains(ex.status())) { + return false; + } + throw ex; + } } private String getISMEnabled(final GetClusterSettingsResponse response) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 0cf6221302..05b90afbc1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -57,7 +57,7 @@ public class IndexConfiguration { public static final String DOCUMENT_ROOT_KEY = "document_root_key"; private IndexType indexType; - private final TemplateType templateType; + private TemplateType templateType; private final String indexAlias; private final Map indexTemplate; private final String documentIdField; @@ -90,7 +90,7 @@ private IndexConfiguration(final Builder builder) { this.s3AwsExternalId = builder.s3AwsStsExternalId; this.s3Client = builder.s3Client; - this.templateType = builder.templateType != null ? builder.templateType : TemplateType.V1; + determineTemplateType(builder); this.indexTemplate = readIndexTemplate(builder.templateFile, indexType, templateType); if (builder.numReplicas > 0) { @@ -143,6 +143,14 @@ private void determineIndexType(Builder builder) { } } + private void determineTemplateType(Builder builder) { + if (builder.serverless) { + templateType = TemplateType.INDEX_TEMPLATE; + } else { + templateType = builder.templateType != null ? builder.templateType : TemplateType.V1; + } + } + public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) { IndexConfiguration.Builder builder = new IndexConfiguration.Builder(); final String indexAlias = pluginSetting.getStringOrDefault(INDEX_ALIAS, null); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java index 7099c7c924..6522a93e98 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java @@ -5,9 +5,12 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import org.apache.http.HttpStatus; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.opensearch.client.ResponseException; @@ -336,6 +339,21 @@ void checkISMEnabled_False() throws IOException { verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class)); } + @ParameterizedTest + @ValueSource(ints = {HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST}) + void checkISMEnabled_FalseWhenOpenSearchExceptionStatusNonRetryable(final int statusCode) throws IOException { + defaultIndexManager = indexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); + when(openSearchException.status()).thenReturn(statusCode); + when(openSearchClusterClient.getSettings(any(GetClusterSettingsRequest.class))).thenThrow(openSearchException); + assertEquals(false, defaultIndexManager.checkISMEnabled()); + verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); + verify(indexConfiguration).getIsmPolicyFile(); + verify(indexConfiguration).getIndexAlias(); + verify(openSearchClient).cluster(); + verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class)); + } + @Test void checkAndCreatePolicy_Normal() throws IOException { defaultIndexManager = indexManagerFactory.getIndexManager( diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index e1da13867f..8423698d8a 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -365,6 +365,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() { final Map metadata = initializeConfigMetaData( null, testIndexAlias, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); + metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.MANAGEMENT_DISABLED, indexConfiguration.getIndexType()); @@ -377,10 +378,12 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() { final Map metadata = initializeConfigMetaData( IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); + metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); + assertEquals(TemplateType.INDEX_TEMPLATE, indexConfiguration.getTemplateType()); assertEquals(true, indexConfiguration.getServerless()); } From e11e0135051803a8b097c7600718ec73bc16f3f0 Mon Sep 17 00:00:00 2001 From: George Chen Date: Mon, 7 Aug 2023 17:51:16 -0500 Subject: [PATCH 2/3] MAINT: rename variable Signed-off-by: George Chen --- .../plugins/sink/opensearch/index/AbstractIndexManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index 2bb49258de..2aca8b0275 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -40,7 +40,7 @@ public abstract class AbstractIndexManager implements IndexManager { = "Invalid alias name [%s], an index exists with the same name as the alias"; public static final String INVALID_INDEX_ALIAS_ERROR = "invalid_index_name_exception"; - static final Set NON_RETRYABLE_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST); + static final Set NO_ISM_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST); private static final String TIME_PATTERN_STARTING_SYMBOLS = "%{"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); protected RestHighLevelClient restHighLevelClient; @@ -185,7 +185,7 @@ final boolean checkISMEnabled() throws IOException { final String enabled = getISMEnabled(response); return enabled != null && enabled.equals("true"); } catch (OpenSearchException ex) { - if (NON_RETRYABLE_HTTP_STATUS.contains(ex.status())) { + if (NO_ISM_HTTP_STATUS.contains(ex.status())) { return false; } throw ex; From 62004111b6dbe86eaabf158cc75da29a936dbce4 Mon Sep 17 00:00:00 2001 From: George Chen Date: Tue, 8 Aug 2023 09:58:45 -0500 Subject: [PATCH 3/3] MAINT: resolve conflict method Signed-off-by: George Chen --- .../plugins/sink/opensearch/index/IndexConfiguration.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 1558702a77..b4aed22211 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -130,11 +130,6 @@ private IndexConfiguration(final Builder builder) { this.documentRootKey = builder.documentRootKey; } - private void determineTemplateType(Builder builder) { - this.templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 : - (builder.templateType != null ? builder.templateType : TemplateType.V1); - } - private void determineIndexType(Builder builder) { if(builder.indexType != null) { Optional mappedIndexType = IndexType.getByValue(builder.indexType); @@ -152,7 +147,8 @@ private void determineTemplateType(Builder builder) { if (builder.serverless) { templateType = TemplateType.INDEX_TEMPLATE; } else { - templateType = builder.templateType != null ? builder.templateType : TemplateType.V1; + templateType = DistributionVersion.ES6.equals(builder.distributionVersion) ? TemplateType.V1 : + (builder.templateType != null ? builder.templateType : TemplateType.V1); } }