Skip to content

Commit

Permalink
Swap opensearch source sdk client from Apache to Netty
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Sep 15, 2023
1 parent af2e4b1 commit 4038673
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,12 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory
static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts";

private final OpenSearchClient openSearchClient;
private final OpenSearchClient openSearchAsyncClient;
private final SearchContextType searchContextType;

public OpenSearchAccessor(final OpenSearchClient openSearchClient,
final OpenSearchClient asyncOpenSearchClient,
final SearchContextType searchContextType) {
this.openSearchClient = openSearchClient;
this.searchContextType = searchContextType;
this.openSearchAsyncClient = asyncOpenSearchClient;
}

@Override
Expand Down Expand Up @@ -130,7 +127,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
@Override
public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
try {
final DeletePitResponse deletePitResponse = openSearchAsyncClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId()))));
final DeletePitResponse deletePitResponse = openSearchClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId()))));
if (isPitDeletedSuccessfully(deletePitResponse)) {
LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId());
} else {
Expand Down Expand Up @@ -197,7 +194,7 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr
@Override
public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) {
try {
final ClearScrollResponse clearScrollResponse = openSearchAsyncClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId())));
final ClearScrollResponse clearScrollResponse = openSearchClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId())));
if (clearScrollResponse.succeeded()) {
LOG.debug("Successfully deleted scroll context with id {}", deleteScrollRequest.getScrollId());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
Expand Down Expand Up @@ -76,18 +75,7 @@ private OpenSearchClientFactory(final AwsCredentialsSupplier awsCredentialsSuppl
public OpenSearchClient provideOpenSearchClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
OpenSearchTransport transport;
if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) {
transport = createOpenSearchTransportForAws(openSearchSourceConfiguration, false);
} else {
final RestClient restClient = createOpenSearchRestClient(openSearchSourceConfiguration);
transport = createOpenSearchTransport(restClient);
}
return new OpenSearchClient(transport);
}

public OpenSearchClient provideOpenSearchAsyncClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
OpenSearchTransport transport;
if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) {
transport = createOpenSearchTransportForAws(openSearchSourceConfiguration, true);
transport = createOpenSearchTransportForAws(openSearchSourceConfiguration);
} else {
final RestClient restClient = createOpenSearchRestClient(openSearchSourceConfiguration);
transport = createOpenSearchTransport(restClient);
Expand All @@ -105,7 +93,7 @@ private OpenSearchTransport createOpenSearchTransport(final RestClient restClien
return new RestClientTransport(restClient, new JacksonJsonpMapper());
}

private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final boolean async) {
private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion())
.withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn())
Expand All @@ -116,42 +104,15 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour
final boolean isServerlessCollection = Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) &&
openSearchSourceConfiguration.getAwsAuthenticationOptions().isServerlessCollection();

if (!async) {
return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration),
HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(),
isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME,
openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(),
AwsSdk2TransportOptions.builder()
.setCredentials(awsCredentialsProvider)
.setMapper(new JacksonJsonpMapper())
.build());
} else {
return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration),
HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(),
isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME,
openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(),
AwsSdk2TransportOptions.builder()
.setCredentials(awsCredentialsProvider)
.setMapper(new JacksonJsonpMapper())
.build());
}

}

private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder();

if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) {
apacheHttpClientBuilder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout());
}

if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout())) {
apacheHttpClientBuilder.socketTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout());
}

attachSSLContext(apacheHttpClientBuilder, openSearchSourceConfiguration);
return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration),
HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(),
isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME,
openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(),
AwsSdk2TransportOptions.builder()
.setCredentials(awsCredentialsProvider)
.setMapper(new JacksonJsonpMapper())
.build());

return apacheHttpClientBuilder.build();
}

public SdkAsyncHttpClient createSdkAsyncHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ public SearchAccessor getSearchAccessor() {
searchContextType = SearchContextType.POINT_IN_TIME;
} else {
LOG.info("{} distribution, version {} detected. Scroll contexts will be used to search documents. " +
"Upgrade your cluster to at least version {} to use Point in Time APIs instead of scroll.", distribution, versionNumber,
distribution.equals(ELASTICSEARCH_DISTRIBUTION) ? ELASTICSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF : OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF);
"Upgrade your cluster to at least OpenSearch {} to use Point in Time APIs instead of scroll.", distribution, versionNumber,
OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF);
searchContextType = SearchContextType.SCROLL;
}

if (Objects.isNull(elasticsearchClient)) {
return new OpenSearchAccessor(openSearchClient,
openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration),
searchContextType);
}

Expand All @@ -113,16 +112,15 @@ private SearchAccessor createSearchAccessorForServerlessCollection(final OpenSea
if (Objects.isNull(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
LOG.info("Configured with AOS serverless flag as true, defaulting to search_context_type as 'none', which uses search_after");
return new OpenSearchAccessor(openSearchClient,
openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration),
SearchContextType.NONE);
} else {
if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
throw new InvalidPluginConfigurationException("A search_context_type of point_in_time is not supported for serverless collections");
if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()) ||
SearchContextType.SCROLL.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
throw new InvalidPluginConfigurationException("A search_context_type of point_in_time or scroll is not supported for serverless collections");
}

LOG.info("Using search_context_type set in the config: '{}'", openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType().toString().toLowerCase());
return new OpenSearchAccessor(openSearchClient,
openSearchClientFactory.provideOpenSearchAsyncClient(openSearchSourceConfiguration),
openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,8 @@ public class OpenSearchAccessorTest {
@Mock
private OpenSearchClient openSearchClient;

@Mock
private OpenSearchClient asyncOpenSearchClient;

private SearchAccessor createObjectUnderTest() {
return new OpenSearchAccessor(openSearchClient, asyncOpenSearchClient, SearchContextType.POINT_IN_TIME);
return new OpenSearchAccessor(openSearchClient, SearchContextType.POINT_IN_TIME);
}

@Test
Expand Down Expand Up @@ -352,7 +349,7 @@ void delete_pit_with_no_exception_does_not_throw(final boolean successful) throw
when(deletePitRecord.successful()).thenReturn(successful);
when(deletePitResponse.pits()).thenReturn(Collections.singletonList(deletePitRecord));

when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse);
when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse);

createObjectUnderTest().deletePit(deletePointInTimeRequest);
}
Expand All @@ -369,7 +366,7 @@ void delete_scroll_with_no_exception_does_not_throw(final boolean successful) th
when(clearScrollResponse.succeeded()).thenReturn(successful);


when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse);
when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse);

createObjectUnderTest().deleteScroll(deleteScrollRequest);
}
Expand All @@ -381,7 +378,7 @@ void delete_pit_does_not_throw_during_opensearch_exception() throws IOException
final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class);
when(deletePointInTimeRequest.getPitId()).thenReturn(pitId);

when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class);
when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class);

createObjectUnderTest().deletePit(deletePointInTimeRequest);
}
Expand All @@ -394,7 +391,7 @@ void delete_scroll_does_not_throw_during_opensearch_exception() throws IOExcepti
when(deleteScrollRequest.getScrollId()).thenReturn(scrollId);


when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class);
when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class);

createObjectUnderTest().deleteScroll(deleteScrollRequest);
}
Expand All @@ -406,7 +403,7 @@ void delete_pit_does_not_throw_exception_when_client_throws_IOException() throws
final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class);
when(deletePointInTimeRequest.getPitId()).thenReturn(pitId);

when(asyncOpenSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class);
when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class);

createObjectUnderTest().deletePit(deletePointInTimeRequest);
}
Expand All @@ -419,7 +416,7 @@ void delete_scroll_does_not_throw_during_IO_exception() throws IOException {
when(deleteScrollRequest.getScrollId()).thenReturn(scrollId);


when(asyncOpenSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class);
when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class);

createObjectUnderTest().deleteScroll(deleteScrollRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,6 @@ void provideOpenSearchClient_with_username_and_password() {

}

@Test
void provideAsyncOpenSearchClient_with_username_and_password() {
final String username = UUID.randomUUID().toString();
final String password = UUID.randomUUID().toString();
when(openSearchSourceConfiguration.getUsername()).thenReturn(username);
when(openSearchSourceConfiguration.getPassword()).thenReturn(password);

when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);
when(connectionConfiguration.isInsecure()).thenReturn(true);

when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(null);

final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchAsyncClient(openSearchSourceConfiguration);
assertThat(openSearchClient, notNullValue());

verifyNoInteractions(awsCredentialsSupplier);

}

@Test
void provideElasticSearchClient_with_username_and_password() {
final String username = UUID.randomUUID().toString();
Expand Down Expand Up @@ -146,7 +125,6 @@ void provideElasticSearchClient_with_aws_auth() {
@Test
void provideOpenSearchClient_with_aws_auth() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand All @@ -171,33 +149,6 @@ void provideOpenSearchClient_with_aws_auth() {
assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
}

@Test
void provideAsyncOpenSearchClient_with_aws_auth() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
when(awsAuthenticationConfiguration.getAwsRegion()).thenReturn(Region.US_EAST_1);
final String stsRoleArn = "arn:aws:iam::123456789012:role/my-role";
when(awsAuthenticationConfiguration.getAwsStsRoleArn()).thenReturn(stsRoleArn);
when(awsAuthenticationConfiguration.getAwsStsHeaderOverrides()).thenReturn(Collections.emptyMap());
when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(false);
when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration);

final ArgumentCaptor<AwsCredentialsOptions> awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class);
when(awsCredentialsSupplier.getProvider(awsCredentialsOptionsArgumentCaptor.capture())).thenReturn(awsCredentialsProvider);

final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchAsyncClient(openSearchSourceConfiguration);
assertThat(openSearchClient, notNullValue());

final AwsCredentialsOptions awsCredentialsOptions = awsCredentialsOptionsArgumentCaptor.getValue();
assertThat(awsCredentialsOptions, notNullValue());
assertThat(awsCredentialsOptions.getRegion(), equalTo(Region.US_EAST_1));
assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap()));
assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
}

@Test
void provideElasticSearchClient_with_auth_disabled() {
when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true);
Expand Down Expand Up @@ -235,7 +186,6 @@ void provideOpenSearchClient_with_auth_disabled() {
@Test
void provideOpenSearchClient_with_aws_auth_and_serverless_flag_true() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down
Loading

0 comments on commit 4038673

Please sign in to comment.