From af2e4b1d9fbfba96f7aeda1c904d1069d072a094 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 14 Sep 2023 11:59:26 -0500 Subject: [PATCH] Use async client to delete scroll and pit for OpenSearch as workaround for bug in client Signed-off-by: Taylor Gray --- .../opensearch-source/build.gradle | 1 + .../worker/client/OpenSearchAccessor.java | 10 ++- .../client/OpenSearchClientFactory.java | 62 ++++++++++++++++--- .../worker/client/SearchAccessorStrategy.java | 12 +++- .../worker/client/OpenSearchAccessorTest.java | 17 ++--- .../client/OpenSearchClientFactoryTest.java | 48 ++++++++++++++ .../client/SearchAccessStrategyTest.java | 3 + 7 files changed, 130 insertions(+), 23 deletions(-) diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle index 1ffb49541b..c080eff561 100644 --- a/data-prepper-plugins/opensearch-source/build.gradle +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation project(':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:aws-plugin-api') implementation 'software.amazon.awssdk:apache-client' + implementation 'software.amazon.awssdk:netty-nio-client' implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index 741bb0f1e5..1a5b672525 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -63,11 +63,15 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts"; private final OpenSearchClient openSearchClient; + private final OpenSearchClient openSearchAsyncClient; private final SearchContextType searchContextType; - public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchContextType searchContextType) { + public OpenSearchAccessor(final OpenSearchClient openSearchClient, + final OpenSearchClient asyncOpenSearchClient, + final SearchContextType searchContextType) { this.openSearchClient = openSearchClient; this.searchContextType = searchContextType; + this.openSearchAsyncClient = asyncOpenSearchClient; } @Override @@ -126,7 +130,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest @Override public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) { try { - final DeletePitResponse deletePitResponse = openSearchClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId())))); + final DeletePitResponse deletePitResponse = openSearchAsyncClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId())))); if (isPitDeletedSuccessfully(deletePitResponse)) { LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId()); } else { @@ -193,7 +197,7 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr @Override public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) { try { - final ClearScrollResponse clearScrollResponse = openSearchClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId()))); + final ClearScrollResponse clearScrollResponse = openSearchAsyncClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId()))); if (clearScrollResponse.succeeded()) { LOG.debug("Successfully deleted scroll context with id {}", deleteScrollRequest.getScrollId()); } else { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java index f240f7bb24..b513a67c04 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java @@ -41,6 +41,8 @@ import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -74,7 +76,18 @@ private OpenSearchClientFactory(final AwsCredentialsSupplier awsCredentialsSuppl public OpenSearchClient provideOpenSearchClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { OpenSearchTransport transport; if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { - transport = createOpenSearchTransportForAws(openSearchSourceConfiguration); + transport = createOpenSearchTransportForAws(openSearchSourceConfiguration, false); + } else { + final RestClient restClient = createOpenSearchRestClient(openSearchSourceConfiguration); + transport = createOpenSearchTransport(restClient); + } + return new OpenSearchClient(transport); + } + + public OpenSearchClient provideOpenSearchAsyncClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + OpenSearchTransport transport; + if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { + transport = createOpenSearchTransportForAws(openSearchSourceConfiguration, true); } else { final RestClient restClient = createOpenSearchRestClient(openSearchSourceConfiguration); transport = createOpenSearchTransport(restClient); @@ -92,7 +105,7 @@ private OpenSearchTransport createOpenSearchTransport(final RestClient restClien return new RestClientTransport(restClient, new JacksonJsonpMapper()); } - private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final boolean async) { final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) @@ -103,14 +116,26 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour final boolean isServerlessCollection = Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) && openSearchSourceConfiguration.getAwsAuthenticationOptions().isServerlessCollection(); - return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration), - HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), - isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, - openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), - AwsSdk2TransportOptions.builder() - .setCredentials(awsCredentialsProvider) - .setMapper(new JacksonJsonpMapper()) - .build()); + if (!async) { + return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration), + HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), + isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, + openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), + AwsSdk2TransportOptions.builder() + .setCredentials(awsCredentialsProvider) + .setMapper(new JacksonJsonpMapper()) + .build()); + } else { + return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration), + HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), + isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME, + openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), + AwsSdk2TransportOptions.builder() + .setCredentials(awsCredentialsProvider) + .setMapper(new JacksonJsonpMapper()) + .build()); + } + } private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { @@ -129,6 +154,18 @@ private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration op return apacheHttpClientBuilder.build(); } + public SdkAsyncHttpClient createSdkAsyncHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder(); + + if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) { + builder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout()); + } + + attachSSLContext(builder, openSearchSourceConfiguration); + + return builder.build(); + } + private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { final List hosts = openSearchSourceConfiguration.getHosts(); final HttpHost[] httpHosts = new HttpHost[hosts.size()]; @@ -274,6 +311,11 @@ private void attachSSLContext(final ApacheHttpClient.Builder apacheHttpClientBui apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers); } + private void attachSSLContext(final NettyNioAsyncHttpClient.Builder asyncClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration().getCertPath()); + asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers); + } + private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { final ConnectionConfiguration connectionConfiguration = openSearchSourceConfiguration.getConnectionConfiguration(); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java index 7dee8ae140..9d2b6bfd3b 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java @@ -101,7 +101,9 @@ public SearchAccessor getSearchAccessor() { } if (Objects.isNull(elasticsearchClient)) { - return new OpenSearchAccessor(openSearchClient, searchContextType); + return new OpenSearchAccessor(openSearchClient, + openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration), + searchContextType); } return new ElasticsearchAccessor(elasticsearchClient, searchContextType); @@ -110,14 +112,18 @@ public SearchAccessor getSearchAccessor() { private SearchAccessor createSearchAccessorForServerlessCollection(final OpenSearchClient openSearchClient) { if (Objects.isNull(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { LOG.info("Configured with AOS serverless flag as true, defaulting to search_context_type as 'none', which uses search_after"); - return new OpenSearchAccessor(openSearchClient, SearchContextType.NONE); + return new OpenSearchAccessor(openSearchClient, + openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration), + SearchContextType.NONE); } else { if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) { throw new InvalidPluginConfigurationException("A search_context_type of point_in_time is not supported for serverless collections"); } LOG.info("Using search_context_type set in the config: '{}'", openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType().toString().toLowerCase()); - return new OpenSearchAccessor(openSearchClient, openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()); + return new OpenSearchAccessor(openSearchClient, + openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration), + openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java index 69d0b68a5a..b4c3e4adcb 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -70,8 +70,11 @@ public class OpenSearchAccessorTest { @Mock private OpenSearchClient openSearchClient; + @Mock + private OpenSearchClient asyncOpenSearchClient; + private SearchAccessor createObjectUnderTest() { - return new OpenSearchAccessor(openSearchClient, SearchContextType.POINT_IN_TIME); + return new OpenSearchAccessor(openSearchClient, asyncOpenSearchClient, SearchContextType.POINT_IN_TIME); } @Test @@ -349,7 +352,7 @@ void delete_pit_with_no_exception_does_not_throw(final boolean successful) throw when(deletePitRecord.successful()).thenReturn(successful); when(deletePitResponse.pits()).thenReturn(Collections.singletonList(deletePitRecord)); - when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse); + when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse); createObjectUnderTest().deletePit(deletePointInTimeRequest); } @@ -366,7 +369,7 @@ void delete_scroll_with_no_exception_does_not_throw(final boolean successful) th when(clearScrollResponse.succeeded()).thenReturn(successful); - when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse); + when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse); createObjectUnderTest().deleteScroll(deleteScrollRequest); } @@ -378,7 +381,7 @@ void delete_pit_does_not_throw_during_opensearch_exception() throws IOException final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); - when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class); + when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class); createObjectUnderTest().deletePit(deletePointInTimeRequest); } @@ -391,7 +394,7 @@ void delete_scroll_does_not_throw_during_opensearch_exception() throws IOExcepti when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); - when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class); + when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class); createObjectUnderTest().deleteScroll(deleteScrollRequest); } @@ -403,7 +406,7 @@ void delete_pit_does_not_throw_exception_when_client_throws_IOException() throws final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); - when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class); + when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class); createObjectUnderTest().deletePit(deletePointInTimeRequest); } @@ -416,7 +419,7 @@ void delete_scroll_does_not_throw_during_IO_exception() throws IOException { when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); - when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class); + when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class); createObjectUnderTest().deleteScroll(deleteScrollRequest); } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java index 54497b2ce7..794eb022c1 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java @@ -77,6 +77,27 @@ void provideOpenSearchClient_with_username_and_password() { } + @Test + void provideAsyncOpenSearchClient_with_username_and_password() { + final String username = UUID.randomUUID().toString(); + final String password = UUID.randomUUID().toString(); + when(openSearchSourceConfiguration.getUsername()).thenReturn(username); + when(openSearchSourceConfiguration.getPassword()).thenReturn(password); + + when(connectionConfiguration.getCertPath()).thenReturn(null); + when(connectionConfiguration.getSocketTimeout()).thenReturn(null); + when(connectionConfiguration.getConnectTimeout()).thenReturn(null); + when(connectionConfiguration.isInsecure()).thenReturn(true); + + when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(null); + + final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchAsyncClient(openSearchSourceConfiguration); + assertThat(openSearchClient, notNullValue()); + + verifyNoInteractions(awsCredentialsSupplier); + + } + @Test void provideElasticSearchClient_with_username_and_password() { final String username = UUID.randomUUID().toString(); @@ -150,6 +171,33 @@ void provideOpenSearchClient_with_aws_auth() { assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); } + @Test + void provideAsyncOpenSearchClient_with_aws_auth() { + when(connectionConfiguration.getCertPath()).thenReturn(null); + when(connectionConfiguration.getConnectTimeout()).thenReturn(null); + + final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); + when(awsAuthenticationConfiguration.getAwsRegion()).thenReturn(Region.US_EAST_1); + final String stsRoleArn = "arn:aws:iam::123456789012:role/my-role"; + when(awsAuthenticationConfiguration.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationConfiguration.getAwsStsHeaderOverrides()).thenReturn(Collections.emptyMap()); + when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(false); + when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration); + + final ArgumentCaptor awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(awsCredentialsOptionsArgumentCaptor.capture())).thenReturn(awsCredentialsProvider); + + final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchAsyncClient(openSearchSourceConfiguration); + assertThat(openSearchClient, notNullValue()); + + final AwsCredentialsOptions awsCredentialsOptions = awsCredentialsOptionsArgumentCaptor.getValue(); + assertThat(awsCredentialsOptions, notNullValue()); + assertThat(awsCredentialsOptions.getRegion(), equalTo(Region.US_EAST_1)); + assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap())); + assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + } + @Test void provideElasticSearchClient_with_auth_disabled() { when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java index 203c1e984c..23ae8f4e96 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java @@ -64,6 +64,7 @@ void testHappyPath_for_different_point_in_time_versions_for_opensearch(final Str final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); + when(openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration)).thenReturn(mock(OpenSearchClient.class)); final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor(); assertThat(searchAccessor, notNullValue()); @@ -143,6 +144,7 @@ void testHappyPath_with_for_different_scroll_versions_for_opensearch(final Strin final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); + when(openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration)).thenReturn(mock(OpenSearchClient.class)); final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor(); assertThat(searchAccessor, notNullValue()); @@ -183,6 +185,7 @@ void search_context_type_set_to_none_uses_that_search_context_regardless_of_vers final OpenSearchClient openSearchClient = mock(OpenSearchClient.class); when(openSearchClient.info()).thenReturn(infoResponse); when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient); + when(openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration)).thenReturn(mock(OpenSearchClient.class)); final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.NONE);