Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: opensearch source secrets refreshment suppport #3437

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
edeb6bf
INIT: secrets refreshment infra
chenqi0805 Sep 28, 2023
2661629
MAINT: add interval and test validity
chenqi0805 Sep 29, 2023
9ae948b
MAINT: some more refactoring
chenqi0805 Sep 30, 2023
2e33eac
MAINT: delete unused classes
chenqi0805 Sep 30, 2023
6421c50
TST: AwsSecretsPluginConfigPublisherExtensionProviderTest
chenqi0805 Sep 30, 2023
3256efb
MAINT: inject PluginConfigPublisher into PluginCreator
chenqi0805 Sep 30, 2023
3ba133f
MAINT: complete test cases for AwsSecretPluginIT
chenqi0805 Sep 30, 2023
9439296
MAINT: test refresh secrets
chenqi0805 Sep 30, 2023
d5fe312
MAINT: refactoring and documentation
chenqi0805 Oct 1, 2023
c8af912
STY: import
chenqi0805 Oct 1, 2023
af0c206
MAINT: fix test cases
chenqi0805 Oct 1, 2023
ebdc448
MAINT: missing test case
chenqi0805 Oct 1, 2023
94513cf
ADD: client refresher
chenqi0805 Oct 2, 2023
93b3794
REF: client -> clientRefresher
chenqi0805 Oct 2, 2023
06a3c27
MAINT: refactor method and add listener
chenqi0805 Oct 2, 2023
1ba290e
MAINT: address minor comments
chenqi0805 Oct 5, 2023
0bd7a0f
REF: PluginConfigurationObservableRegister
chenqi0805 Oct 5, 2023
ca74d46
Merge branch 'enh/secret-refreshment-infra' into enh/opensearch-sourc…
chenqi0805 Oct 5, 2023
a7d7fd0
MAINT: pluginConfigObserver
chenqi0805 Oct 5, 2023
613770f
Merge branch 'main' into enh/opensearch-source-secrets-refreshment-su…
chenqi0805 Oct 5, 2023
c8fc2fe
MNT: createObjectUnderTest
chenqi0805 Oct 6, 2023
8f90a0e
REF: share common interface
chenqi0805 Oct 6, 2023
43972fb
MAINT: use interface
chenqi0805 Oct 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
* @since 2.5
*/
public interface PluginComponentRefresher<PluginComponent, PluginConfig> {
/**
* Returns the {@link PluginComponent} class.
*
* @return {@link PluginComponent} class.
*/
Class<PluginComponent> getComponentClass();

/**
* Returns the refreshed {@link PluginComponent}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;

import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

public class ClientRefresher<Client>
implements PluginComponentRefresher<Client, OpenSearchSourceConfiguration> {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Function<OpenSearchSourceConfiguration, Client> clientFunction;
private OpenSearchSourceConfiguration existingConfig;
private final Class<Client> clientClass;
private Client currentClient;

public ClientRefresher(final Class<Client> clientClass,
final Function<OpenSearchSourceConfiguration, Client> clientFunction,
final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.clientClass = clientClass;
this.clientFunction = clientFunction;
existingConfig = openSearchSourceConfiguration;
currentClient = clientFunction.apply(openSearchSourceConfiguration);
}

@Override
public Class<Client> getComponentClass() {
return clientClass;
}

@Override
public Client get() {
readWriteLock.readLock().lock();
try {
return currentClient;
} finally {
readWriteLock.readLock().unlock();
}
Comment on lines +34 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these locks needed just in case we had multiple worker threads going at once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. when secrets are configured there are at least two threads accessing this: pipeline worker thread and secret polling thread

}

@Override
public void update(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
if (basicAuthChanged(openSearchSourceConfiguration)) {
readWriteLock.writeLock().lock();
try {
currentClient = clientFunction.apply(openSearchSourceConfiguration);
existingConfig = openSearchSourceConfiguration;
} finally {
readWriteLock.writeLock().unlock();
}
}
}

private boolean basicAuthChanged(final OpenSearchSourceConfiguration newConfig) {
return !Objects.equals(existingConfig.getUsername(), newConfig.getUsername()) ||
!Objects.equals(existingConfig.getPassword(), newConfig.getPassword());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand All @@ -29,6 +30,7 @@ public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordi
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final AcknowledgementSetManager acknowledgementSetManager;
private final PluginMetrics pluginMetrics;
private final PluginConfigObservable pluginConfigObservable;

private SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private OpenSearchService openSearchService;
Expand All @@ -37,11 +39,13 @@ public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordi
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final PluginConfigObservable pluginConfigObservable) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.pluginConfigObservable = pluginConfigObservable;

openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword();
}
Expand All @@ -59,7 +63,8 @@ private void startProcess(final OpenSearchSourceConfiguration openSearchSourceCo

final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier);
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics);
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory);
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(
openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the code always assume that the pluginConfigObservable is always non-null, is that right? Is OpenSearchSource without key refresh not allowed anymore?

Copy link
Collaborator Author

@chenqi0805 chenqi0805 Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed always non-null. If there is no secret usage, the PluginConfigPublisher notifier would never be invoked even though the PluginConfigObservable will still be instantiated and loaded with PluginConfigObserver, i.e. PluginConfigObservable::update will never be invoked. At the client level, this means there is only read access to client although there is indeed a bit overhead from acquiring the read lock.


final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.cat.IndicesResponse;
import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
Expand All @@ -33,34 +34,37 @@ public class OpenSearchIndexPartitionCreationSupplier implements Function<Map<St

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final IndexParametersConfiguration indexParametersConfiguration;
private OpenSearchClient openSearchClient;
private ElasticsearchClient elasticsearchClient;
private PluginComponentRefresher<OpenSearchClient, OpenSearchSourceConfiguration> openSearchClientRefresher;
private PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration> elasticsearchClientRefresher;


public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final ClusterClientFactory clusterClientFactory) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.indexParametersConfiguration = openSearchSourceConfiguration.getIndexParametersConfiguration();

final Object client = clusterClientFactory.getClient();
final PluginComponentRefresher<?, OpenSearchSourceConfiguration> clientRefresher =
clusterClientFactory.getClientRefresher();

if (client instanceof OpenSearchClient) {
this.openSearchClient = (OpenSearchClient) client;
} else if (client instanceof ElasticsearchClient) {
this.elasticsearchClient = (ElasticsearchClient) client;
if (OpenSearchClient.class.isAssignableFrom(clientRefresher.getComponentClass())) {
this.openSearchClientRefresher = (PluginComponentRefresher<OpenSearchClient, OpenSearchSourceConfiguration>)
clientRefresher;
} else if (ElasticsearchClient.class.isAssignableFrom(clientRefresher.getComponentClass())) {
this.elasticsearchClientRefresher =
(PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration>) clientRefresher;
} else {
throw new IllegalArgumentException(String.format("ClusterClientFactory provided an invalid client object to the index partition creation supplier. " +
"The client must be of type OpenSearchClient. The client passed is of class %s", client.getClass()));
"The clientRefresher must be of type OpenSearchClientRefresher. The clientRefresher passed is of class %s", clientRefresher.getClass()));
}

}

@Override
public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap) {

if (Objects.nonNull(openSearchClient)) {
if (Objects.nonNull(openSearchClientRefresher)) {
return applyForOpenSearchClient(globalStateMap);
} else if (Objects.nonNull(elasticsearchClient)) {
} else if (Objects.nonNull(elasticsearchClientRefresher)) {
return applyForElasticSearchClient(globalStateMap);
}

Expand All @@ -70,7 +74,7 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Object> globalStateMap) {
IndicesResponse indicesResponse;
try {
indicesResponse = openSearchClient.cat().indices();
indicesResponse = openSearchClientRefresher.get().cat().indices();
} catch (IOException | OpenSearchException e) {
LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e);
return Collections.emptyList();
Expand All @@ -87,7 +91,7 @@ private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Obj
private List<PartitionIdentifier> applyForElasticSearchClient(final Map<String, Object> globalStateMap) {
co.elastic.clients.elasticsearch.cat.IndicesResponse indicesResponse;
try {
indicesResponse = elasticsearchClient.cat().indices();
indicesResponse = elasticsearchClientRefresher.get().cat().indices();
} catch (IOException | ElasticsearchException e) {
LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

public interface ClusterClientFactory {
Object getClient();
import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;

public interface ClusterClientFactory<Client> {
PluginComponentRefresher<Client, OpenSearchSourceConfiguration> getClientRefresher();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line with my comment elsewhere, change this to:

Supplier<Client> getClientSupplier();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably would not work for OpenSearchIndexPartitionCreationSupplier. Changed into

ClientRefresher<Client> getClientRefresher();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make ClientRefresher implement Supplier<C>. Then you can use a simpler interface in the majority of the code.

Copy link
Collaborator Author

@chenqi0805 chenqi0805 Oct 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there are some tight coupling in using APIs, e.g. OpenSearchIndexPartitionCreationSupplier needs the SearchAccessor to return more than Supplier. I improved it to be propagating PluginComponentRefresher<Client, OpenSearchConfiguration>

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
Expand Down Expand Up @@ -55,18 +57,21 @@
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory {
public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory<ElasticsearchClient> {

private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchAccessor.class);

static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception";

private final ElasticsearchClient elasticsearchClient;
private final PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration>
elasticsearchClientRefresher;
private final SearchContextType searchContextType;

public ElasticsearchAccessor(final ElasticsearchClient elasticsearchClient, final SearchContextType searchContextType) {
this.elasticsearchClient = elasticsearchClient;
public ElasticsearchAccessor(final PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration>
elasticsearchClientRefresher,
final SearchContextType searchContextType) {
this.elasticsearchClientRefresher = elasticsearchClientRefresher;
this.searchContextType = searchContextType;
}

Expand All @@ -80,7 +85,8 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create

OpenPointInTimeResponse openPointInTimeResponse;
try {
openPointInTimeResponse = elasticsearchClient.openPointInTime(OpenPointInTimeRequest.of(request -> request
openPointInTimeResponse = elasticsearchClientRefresher.get()
.openPointInTime(OpenPointInTimeRequest.of(request -> request
.keepAlive(Time.of(time -> time.time(createPointInTimeRequest.getKeepAlive())))
.index(createPointInTimeRequest.getIndex())));
} catch (final ElasticsearchException e) {
Expand Down Expand Up @@ -131,7 +137,8 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
@Override
public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
try {
final ClosePointInTimeResponse closePointInTimeResponse = elasticsearchClient.closePointInTime(ClosePointInTimeRequest.of(request -> request
final ClosePointInTimeResponse closePointInTimeResponse = elasticsearchClientRefresher.get()
.closePointInTime(ClosePointInTimeRequest.of(request -> request
.id(deletePointInTimeRequest.getPitId())));
if (closePointInTimeResponse.succeeded()) {
LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId());
Expand All @@ -149,7 +156,8 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
SearchResponse<ObjectNode> searchResponse;

try {
searchResponse = elasticsearchClient.search(SearchRequest.of(request -> request
searchResponse = elasticsearchClientRefresher.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this change is somewhat intrusive to the code, and we don't currently have integration tests for this source, just curious what testing you did to verify no regression?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I run a smoke test with pipeline:

pipeline_configurations:
  aws:
    secrets:
      secret1:
        secret_id: my-es-secret
        region: us-east-1
        sts_role_arn: "arn:aws:iam::xxxx:role/ecs-test-role"
      secret2:
        secret_id: my-self-managed-os
        region: us-east-1
        sts_role_arn: "arn:aws:iam::xxxx:role/ecs-test-role"
opensearch-migration-pipeline:
  source:
    opensearch:
      hosts: [ "https://localhost:9200" ]
      username: "${{aws_secrets:secret2:username}}"
      password: "${{aws_secrets:secret2:password}}"
  processor:
    - date:
        from_time_received: true
  sink:
    - opensearch:
        hosts: [ "https://search-test-domain-r3dk3xkywa3vqqdzdqrddq3kyq.us-east-1.es.amazonaws.com" ]
        username: "${{aws_secrets:secret1:username}}"
        password: "${{aws_secrets:secret1:password}}"
        index: "test-migration"

.search(SearchRequest.of(request -> request
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.size(createScrollRequest.getSize())
Expand Down Expand Up @@ -182,7 +190,8 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScrollRequest) {
SearchResponse<ObjectNode> searchResponse;
try {
searchResponse = elasticsearchClient.scroll(ScrollRequest.of(request -> request
searchResponse = elasticsearchClientRefresher.get()
.scroll(ScrollRequest.of(request -> request
.scrollId(searchScrollRequest.getScrollId())
.scroll(Time.of(time -> time.time(searchScrollRequest.getScrollTime())))), ObjectNode.class);
} catch (final ElasticsearchException e) {
Expand All @@ -202,7 +211,8 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr
@Override
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
try {
final ClearScrollResponse clearScrollResponse = elasticsearchClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId())));
final ClearScrollResponse clearScrollResponse = elasticsearchClientRefresher.get()
.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId())));
if (clearScrollResponse.succeeded()) {
LOG.debug("Successfully deleted scroll context with id {}", deleteScrollRequest.getScrollId());
} else {
Expand Down Expand Up @@ -236,14 +246,15 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
}

@Override
public Object getClient() {
return elasticsearchClient;
public PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration> getClientRefresher() {
return elasticsearchClientRefresher;
}

private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest searchRequest) {

try {
final SearchResponse<ObjectNode> searchResponse = elasticsearchClient.search(searchRequest, ObjectNode.class);
final SearchResponse<ObjectNode> searchResponse = elasticsearchClientRefresher.get()
.search(searchRequest, ObjectNode.class);

final List<Event> documents = getDocumentsFromResponse(searchResponse);

Expand Down
Loading
Loading