From 6286388d238af459502fc5d999189506d247eb85 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 18 Sep 2023 14:14:34 -0500 Subject: [PATCH] =?UTF-8?q?Use=20async=20client=20to=20delete=20scroll=20a?= =?UTF-8?q?nd=20pit=20for=20OpenSearch=20as=20workaroun=E2=80=A6=20(#3338)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use async client to delete scroll and pit for OpenSearch as workaround for bug in client Signed-off-by: Taylor Gray --- .../opensearch-source/build.gradle | 1 + .../worker/client/OpenSearchAccessor.java | 3 ++- .../client/OpenSearchClientFactory.java | 25 +++++++++++-------- .../worker/client/SearchAccessorStrategy.java | 18 +++++++------ .../client/OpenSearchClientFactoryTest.java | 2 -- .../client/SearchAccessStrategyTest.java | 10 ++++---- 6 files changed, 33 insertions(+), 26 deletions(-) diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle index 1ffb49541b..c080eff561 100644 --- a/data-prepper-plugins/opensearch-source/build.gradle +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation project(':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:aws-plugin-api') implementation 'software.amazon.awssdk:apache-client' + implementation 'software.amazon.awssdk:netty-nio-client' implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index 741bb0f1e5..a7c1fccc93 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -65,7 +65,8 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory private final OpenSearchClient openSearchClient; private final SearchContextType searchContextType; - public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchContextType searchContextType) { + public OpenSearchAccessor(final OpenSearchClient openSearchClient, + final SearchContextType searchContextType) { this.openSearchClient = openSearchClient; this.searchContextType = searchContextType; } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java index f240f7bb24..f15d70df90 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java @@ -39,8 +39,9 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.signer.Aws4Signer; -import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -103,7 +104,7 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour final boolean isServerlessCollection = Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) && openSearchSourceConfiguration.getAwsAuthenticationOptions().isServerlessCollection(); - return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration), + return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration), HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), @@ -111,22 +112,19 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour .setCredentials(awsCredentialsProvider) .setMapper(new JacksonJsonpMapper()) .build()); + } - private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { - final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder(); + public SdkAsyncHttpClient createSdkAsyncHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder(); if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) { - apacheHttpClientBuilder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout()); - } - - if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout())) { - apacheHttpClientBuilder.socketTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout()); + builder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout()); } - attachSSLContext(apacheHttpClientBuilder, openSearchSourceConfiguration); + attachSSLContext(builder, openSearchSourceConfiguration); - return apacheHttpClientBuilder.build(); + return builder.build(); } private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { @@ -274,6 +272,11 @@ private void attachSSLContext(final ApacheHttpClient.Builder apacheHttpClientBui apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers); } + private void attachSSLContext(final NettyNioAsyncHttpClient.Builder asyncClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration().getCertPath()); + asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers); + } + private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { final ConnectionConfiguration connectionConfiguration = openSearchSourceConfiguration.getConnectionConfiguration(); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java index 7dee8ae140..945455cf33 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java @@ -95,13 +95,14 @@ public SearchAccessor getSearchAccessor() { searchContextType = SearchContextType.POINT_IN_TIME; } else { LOG.info("{} distribution, version {} detected. Scroll contexts will be used to search documents. " + - "Upgrade your cluster to at least version {} to use Point in Time APIs instead of scroll.", distribution, versionNumber, - distribution.equals(ELASTICSEARCH_DISTRIBUTION) ? ELASTICSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF : OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF); + "Upgrade your cluster to at least OpenSearch {} to use Point in Time APIs instead of scroll.", distribution, versionNumber, + OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF); searchContextType = SearchContextType.SCROLL; } if (Objects.isNull(elasticsearchClient)) { - return new OpenSearchAccessor(openSearchClient, searchContextType); + return new OpenSearchAccessor(openSearchClient, + searchContextType); } return new ElasticsearchAccessor(elasticsearchClient, searchContextType); @@ -110,14 +111,17 @@ public SearchAccessor getSearchAccessor() { private SearchAccessor createSearchAccessorForServerlessCollection(final OpenSearchClient openSearchClient) { if (Objects.isNull(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { LOG.info("Configured with AOS serverless flag as true, defaulting to search_context_type as 'none', which uses search_after"); - return new OpenSearchAccessor(openSearchClient, SearchContextType.NONE); + return new OpenSearchAccessor(openSearchClient, + SearchContextType.NONE); } else { - if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { - throw new InvalidPluginConfigurationException("A search_context_type of point_in_time is not supported for serverless collections"); + if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()) || + SearchContextType.SCROLL.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { + throw new InvalidPluginConfigurationException("A search_context_type of point_in_time or scroll is not supported for serverless collections"); } LOG.info("Using search_context_type set in the config: '{}'", openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType().toString().toLowerCase()); - return new OpenSearchAccessor(openSearchClient, openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()); + return new OpenSearchAccessor(openSearchClient, + openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java index 54497b2ce7..097d0e1250 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java @@ -125,7 +125,6 @@ void provideElasticSearchClient_with_aws_auth() { @Test void provideOpenSearchClient_with_aws_auth() { when(connectionConfiguration.getCertPath()).thenReturn(null); - when(connectionConfiguration.getSocketTimeout()).thenReturn(null); when(connectionConfiguration.getConnectTimeout()).thenReturn(null); final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); @@ -187,7 +186,6 @@ void provideOpenSearchClient_with_auth_disabled() { @Test void provideOpenSearchClient_with_aws_auth_and_serverless_flag_true() { when(connectionConfiguration.getCertPath()).thenReturn(null); - when(connectionConfiguration.getSocketTimeout()).thenReturn(null); when(connectionConfiguration.getConnectTimeout()).thenReturn(null); final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java index 203c1e984c..c7c7331dab 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java @@ -64,7 +64,6 @@ void testHappyPath_for_different_point_in_time_versions_for_opensearch(final Str final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); - final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor(); assertThat(searchAccessor, notNullValue()); assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.POINT_IN_TIME)); @@ -209,15 +208,16 @@ void serverless_flag_true_defaults_to_search_context_type_none() { assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.NONE)); } - @Test - void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time() { + @ParameterizedTest + @ValueSource(strings = {"POINT_IN_TIME", "SCROLL"}) + void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time_or_scroll(final String searchContextType) { final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(true); when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration); final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); - when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.POINT_IN_TIME); + when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.valueOf(searchContextType)); when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); final SearchAccessorStrategy objectUnderTest = createObjectUnderTest(); @@ -226,7 +226,7 @@ void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_ty } @ParameterizedTest - @ValueSource(strings = {"NONE", "SCROLL"}) + @ValueSource(strings = {"NONE"}) void serverless_flag_true_uses_search_context_type_from_config(final String searchContextType) { final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);