From 4038673c50f2a4f6386b5924e22d19b1ce585562 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 15 Sep 2023 15:36:54 -0500 Subject: [PATCH] Swap opensearch source sdk client from Apache to Netty Signed-off-by: Taylor Gray --- .../worker/client/OpenSearchAccessor.java | 7 +-- .../client/OpenSearchClientFactory.java | 59 ++++--------------- .../worker/client/SearchAccessorStrategy.java | 12 ++-- .../worker/client/OpenSearchAccessorTest.java | 17 +++--- .../client/OpenSearchClientFactoryTest.java | 50 ---------------- .../client/SearchAccessStrategyTest.java | 13 ++-- 6 files changed, 29 insertions(+), 129 deletions(-) diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index 1a5b672525..a7c1fccc93 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -63,15 +63,12 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts"; private final OpenSearchClient openSearchClient; - private final OpenSearchClient openSearchAsyncClient; private final SearchContextType searchContextType; public OpenSearchAccessor(final OpenSearchClient openSearchClient, - final OpenSearchClient asyncOpenSearchClient, final SearchContextType searchContextType) { this.openSearchClient = openSearchClient; this.searchContextType = searchContextType; - this.openSearchAsyncClient = asyncOpenSearchClient; } @Override @@ -130,7 +127,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest @Override public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) { try { - final DeletePitResponse deletePitResponse = openSearchAsyncClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId())))); + final DeletePitResponse deletePitResponse = openSearchClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId())))); if (isPitDeletedSuccessfully(deletePitResponse)) { LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId()); } else { @@ -197,7 +194,7 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr @Override public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) { try { - final ClearScrollResponse clearScrollResponse = openSearchAsyncClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId()))); + final ClearScrollResponse clearScrollResponse = openSearchClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId()))); if (clearScrollResponse.succeeded()) { LOG.debug("Successfully deleted scroll context with id {}", deleteScrollRequest.getScrollId()); } else { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java index b513a67c04..f15d70df90 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.signer.Aws4Signer; -import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; @@ -76,18 +75,7 @@ private OpenSearchClientFactory(final AwsCredentialsSupplier awsCredentialsSuppl public OpenSearchClient provideOpenSearchClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { OpenSearchTransport transport; if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { - transport = createOpenSearchTransportForAws(openSearchSourceConfiguration, false); - } else { - final RestClient restClient = createOpenSearchRestClient(openSearchSourceConfiguration); - transport = createOpenSearchTransport(restClient); - } - return new OpenSearchClient(transport); - } - - public OpenSearchClient provideOpenSearchAsyncClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { - OpenSearchTransport transport; - if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { - transport = createOpenSearchTransportForAws(openSearchSourceConfiguration, true); + transport = createOpenSearchTransportForAws(openSearchSourceConfiguration); } else { final RestClient restClient = createOpenSearchRestClient(openSearchSourceConfiguration); transport = createOpenSearchTransport(restClient); @@ -105,7 +93,7 @@ private OpenSearchTransport createOpenSearchTransport(final RestClient restClien return new RestClientTransport(restClient, new JacksonJsonpMapper()); } - private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final boolean async) { + private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) @@ -116,42 +104,15 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour final boolean isServerlessCollection = Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) && openSearchSourceConfiguration.getAwsAuthenticationOptions().isServerlessCollection(); - if (!async) { - return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration), - HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), - isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, - openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), - AwsSdk2TransportOptions.builder() - .setCredentials(awsCredentialsProvider) - .setMapper(new JacksonJsonpMapper()) - .build()); - } else { - return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration), - HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), - isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, - openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), - AwsSdk2TransportOptions.builder() - .setCredentials(awsCredentialsProvider) - .setMapper(new JacksonJsonpMapper()) - .build()); - } - - } - - private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { - final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder(); - - if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) { - apacheHttpClientBuilder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout()); - } - - if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout())) { - apacheHttpClientBuilder.socketTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout()); - } - - attachSSLContext(apacheHttpClientBuilder, openSearchSourceConfiguration); + return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration), + HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), + isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, + openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), + AwsSdk2TransportOptions.builder() + .setCredentials(awsCredentialsProvider) + .setMapper(new JacksonJsonpMapper()) + .build()); - return apacheHttpClientBuilder.build(); } public SdkAsyncHttpClient createSdkAsyncHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java index 9d2b6bfd3b..945455cf33 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java @@ -95,14 +95,13 @@ public SearchAccessor getSearchAccessor() { searchContextType = SearchContextType.POINT_IN_TIME; } else { LOG.info("{} distribution, version {} detected. Scroll contexts will be used to search documents. " + - "Upgrade your cluster to at least version {} to use Point in Time APIs instead of scroll.", distribution, versionNumber, - distribution.equals(ELASTICSEARCH_DISTRIBUTION) ? ELASTICSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF : OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF); + "Upgrade your cluster to at least OpenSearch {} to use Point in Time APIs instead of scroll.", distribution, versionNumber, + OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF); searchContextType = SearchContextType.SCROLL; } if (Objects.isNull(elasticsearchClient)) { return new OpenSearchAccessor(openSearchClient, - openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration), searchContextType); } @@ -113,16 +112,15 @@ private SearchAccessor createSearchAccessorForServerlessCollection(final OpenSea if (Objects.isNull(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { LOG.info("Configured with AOS serverless flag as true, defaulting to search_context_type as 'none', which uses search_after"); return new OpenSearchAccessor(openSearchClient, - openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration), SearchContextType.NONE); } else { - if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { - throw new InvalidPluginConfigurationException("A search_context_type of point_in_time is not supported for serverless collections"); + if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()) || + SearchContextType.SCROLL.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { + throw new InvalidPluginConfigurationException("A search_context_type of point_in_time or scroll is not supported for serverless collections"); } LOG.info("Using search_context_type set in the config: '{}'", openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType().toString().toLowerCase()); return new OpenSearchAccessor(openSearchClient, - openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration), openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java index b4c3e4adcb..69d0b68a5a 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -70,11 +70,8 @@ public class OpenSearchAccessorTest { @Mock private OpenSearchClient openSearchClient; - @Mock - private OpenSearchClient asyncOpenSearchClient; - private SearchAccessor createObjectUnderTest() { - return new OpenSearchAccessor(openSearchClient, asyncOpenSearchClient, SearchContextType.POINT_IN_TIME); + return new OpenSearchAccessor(openSearchClient, SearchContextType.POINT_IN_TIME); } @Test @@ -352,7 +349,7 @@ void delete_pit_with_no_exception_does_not_throw(final boolean successful) throw when(deletePitRecord.successful()).thenReturn(successful); when(deletePitResponse.pits()).thenReturn(Collections.singletonList(deletePitRecord)); - when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse); + when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse); createObjectUnderTest().deletePit(deletePointInTimeRequest); } @@ -369,7 +366,7 @@ void delete_scroll_with_no_exception_does_not_throw(final boolean successful) th when(clearScrollResponse.succeeded()).thenReturn(successful); - when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse); + when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse); createObjectUnderTest().deleteScroll(deleteScrollRequest); } @@ -381,7 +378,7 @@ void delete_pit_does_not_throw_during_opensearch_exception() throws IOException final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); - when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class); + when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class); createObjectUnderTest().deletePit(deletePointInTimeRequest); } @@ -394,7 +391,7 @@ void delete_scroll_does_not_throw_during_opensearch_exception() throws IOExcepti when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); - when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class); + when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class); createObjectUnderTest().deleteScroll(deleteScrollRequest); } @@ -406,7 +403,7 @@ void delete_pit_does_not_throw_exception_when_client_throws_IOException() throws final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); - when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class); + when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class); createObjectUnderTest().deletePit(deletePointInTimeRequest); } @@ -419,7 +416,7 @@ void delete_scroll_does_not_throw_during_IO_exception() throws IOException { when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); - when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class); + when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class); createObjectUnderTest().deleteScroll(deleteScrollRequest); } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java index 794eb022c1..097d0e1250 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java @@ -77,27 +77,6 @@ void provideOpenSearchClient_with_username_and_password() { } - @Test - void provideAsyncOpenSearchClient_with_username_and_password() { - final String username = UUID.randomUUID().toString(); - final String password = UUID.randomUUID().toString(); - when(openSearchSourceConfiguration.getUsername()).thenReturn(username); - when(openSearchSourceConfiguration.getPassword()).thenReturn(password); - - when(connectionConfiguration.getCertPath()).thenReturn(null); - when(connectionConfiguration.getSocketTimeout()).thenReturn(null); - when(connectionConfiguration.getConnectTimeout()).thenReturn(null); - when(connectionConfiguration.isInsecure()).thenReturn(true); - - when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(null); - - final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchAsyncClient(openSearchSourceConfiguration); - assertThat(openSearchClient, notNullValue()); - - verifyNoInteractions(awsCredentialsSupplier); - - } - @Test void provideElasticSearchClient_with_username_and_password() { final String username = UUID.randomUUID().toString(); @@ -146,7 +125,6 @@ void provideElasticSearchClient_with_aws_auth() { @Test void provideOpenSearchClient_with_aws_auth() { when(connectionConfiguration.getCertPath()).thenReturn(null); - when(connectionConfiguration.getSocketTimeout()).thenReturn(null); when(connectionConfiguration.getConnectTimeout()).thenReturn(null); final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); @@ -171,33 +149,6 @@ void provideOpenSearchClient_with_aws_auth() { assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); } - @Test - void provideAsyncOpenSearchClient_with_aws_auth() { - when(connectionConfiguration.getCertPath()).thenReturn(null); - when(connectionConfiguration.getConnectTimeout()).thenReturn(null); - - final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); - when(awsAuthenticationConfiguration.getAwsRegion()).thenReturn(Region.US_EAST_1); - final String stsRoleArn = "arn:aws:iam::123456789012:role/my-role"; - when(awsAuthenticationConfiguration.getAwsStsRoleArn()).thenReturn(stsRoleArn); - when(awsAuthenticationConfiguration.getAwsStsHeaderOverrides()).thenReturn(Collections.emptyMap()); - when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(false); - when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration); - - final ArgumentCaptor awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); - final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class); - when(awsCredentialsSupplier.getProvider(awsCredentialsOptionsArgumentCaptor.capture())).thenReturn(awsCredentialsProvider); - - final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchAsyncClient(openSearchSourceConfiguration); - assertThat(openSearchClient, notNullValue()); - - final AwsCredentialsOptions awsCredentialsOptions = awsCredentialsOptionsArgumentCaptor.getValue(); - assertThat(awsCredentialsOptions, notNullValue()); - assertThat(awsCredentialsOptions.getRegion(), equalTo(Region.US_EAST_1)); - assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap())); - assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); - } - @Test void provideElasticSearchClient_with_auth_disabled() { when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true); @@ -235,7 +186,6 @@ void provideOpenSearchClient_with_auth_disabled() { @Test void provideOpenSearchClient_with_aws_auth_and_serverless_flag_true() { when(connectionConfiguration.getCertPath()).thenReturn(null); - when(connectionConfiguration.getSocketTimeout()).thenReturn(null); when(connectionConfiguration.getConnectTimeout()).thenReturn(null); final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java index 23ae8f4e96..c7c7331dab 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java @@ -64,8 +64,6 @@ void testHappyPath_for_different_point_in_time_versions_for_opensearch(final Str final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); - when(openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration)).thenReturn(mock(OpenSearchClient.class)); - final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor(); assertThat(searchAccessor, notNullValue()); assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.POINT_IN_TIME)); @@ -144,7 +142,6 @@ void testHappyPath_with_for_different_scroll_versions_for_opensearch(final Strin final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); - when(openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration)).thenReturn(mock(OpenSearchClient.class)); final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor(); assertThat(searchAccessor, notNullValue()); @@ -185,7 +182,6 @@ void search_context_type_set_to_none_uses_that_search_context_regardless_of_vers final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); - when(openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration)).thenReturn(mock(OpenSearchClient.class)); final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.NONE); @@ -212,15 +208,16 @@ void serverless_flag_true_defaults_to_search_context_type_none() { assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.NONE)); } - @Test - void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time() { + @ParameterizedTest + @ValueSource(strings = {"POINT_IN_TIME", "SCROLL"}) + void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time_or_scroll(final String searchContextType) { final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(true); when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration); final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); - when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.POINT_IN_TIME); + when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.valueOf(searchContextType)); when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); final SearchAccessorStrategy objectUnderTest = createObjectUnderTest(); @@ -229,7 +226,7 @@ void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_ty } @ParameterizedTest - @ValueSource(strings = {"NONE", "SCROLL"}) + @ValueSource(strings = {"NONE"}) void serverless_flag_true_uses_search_context_type_from_config(final String searchContextType) { final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);