Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support index template for serverless #3071

Merged
merged 4 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ With the `document_root_key` set to `status`. The document structure would be `{
* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`.

## Metrics
### Management Disabled Index Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.http.HttpStatus;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
Expand Down Expand Up @@ -39,6 +40,7 @@ public abstract class AbstractIndexManager implements IndexManager {
= "Invalid alias name [%s], an index exists with the same name as the alias";
public static final String INVALID_INDEX_ALIAS_ERROR
= "invalid_index_name_exception";
static final Set<Integer> NON_RETRYABLE_HTTP_STATUS = Set.of(HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST);
dlvenable marked this conversation as resolved.
Show resolved Hide resolved
private static final String TIME_PATTERN_STARTING_SYMBOLS = "%{";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected RestHighLevelClient restHighLevelClient;
Expand Down Expand Up @@ -178,9 +180,16 @@ final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
.build();
final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request);
final String enabled = getISMEnabled(response);
return enabled != null && enabled.equals("true");
try {
final GetClusterSettingsResponse response = openSearchClient.cluster().getSettings(request);
final String enabled = getISMEnabled(response);
return enabled != null && enabled.equals("true");
} catch (OpenSearchException ex) {
if (NON_RETRYABLE_HTTP_STATUS.contains(ex.status())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a stronger check here? This has any non-retryable indicating the ISM is disabled. What if the user lacks access? We might want that error to halt the sink.

return false;
}
throw ex;
}
}

private String getISMEnabled(final GetClusterSettingsResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class IndexConfiguration {
public static final String DOCUMENT_ROOT_KEY = "document_root_key";

private IndexType indexType;
private final TemplateType templateType;
private TemplateType templateType;
private final String indexAlias;
private final Map<String, Object> indexTemplate;
private final String documentIdField;
Expand Down Expand Up @@ -90,7 +90,7 @@ private IndexConfiguration(final Builder builder) {
this.s3AwsExternalId = builder.s3AwsStsExternalId;
this.s3Client = builder.s3Client;

this.templateType = builder.templateType != null ? builder.templateType : TemplateType.V1;
determineTemplateType(builder);
this.indexTemplate = readIndexTemplate(builder.templateFile, indexType, templateType);

if (builder.numReplicas > 0) {
Expand Down Expand Up @@ -143,6 +143,14 @@ private void determineIndexType(Builder builder) {
}
}

private void determineTemplateType(Builder builder) {
if (builder.serverless) {
templateType = TemplateType.INDEX_TEMPLATE;
} else {
templateType = builder.templateType != null ? builder.templateType : TemplateType.V1;
}
}

public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) {
IndexConfiguration.Builder builder = new IndexConfiguration.Builder();
final String indexAlias = pluginSetting.getStringOrDefault(INDEX_ALIAS, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.apache.http.HttpStatus;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.opensearch.client.ResponseException;
Expand Down Expand Up @@ -336,6 +339,21 @@ void checkISMEnabled_False() throws IOException {
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@ParameterizedTest
@ValueSource(ints = {HttpStatus.SC_NOT_FOUND, HttpStatus.SC_BAD_REQUEST})
void checkISMEnabled_FalseWhenOpenSearchExceptionStatusNonRetryable(final int statusCode) throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchException.status()).thenReturn(statusCode);
when(openSearchClusterClient.getSettings(any(GetClusterSettingsRequest.class))).thenThrow(openSearchException);
assertEquals(false, defaultIndexManager.checkISMEnabled());
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkAndCreatePolicy_Normal() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, testIndexAlias, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.MANAGEMENT_DISABLED, indexConfiguration.getIndexType());
Expand All @@ -377,10 +378,12 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
metadata.put(TEMPLATE_TYPE, TemplateType.V1.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
assertEquals(testIndexAlias, indexConfiguration.getIndexAlias());
assertEquals(TemplateType.INDEX_TEMPLATE, indexConfiguration.getTemplateType());
assertEquals(true, indexConfiguration.getServerless());
}

Expand Down
Loading