-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: opensearch source secrets refreshment suppport #3437
ENH: opensearch source secrets refreshment suppport #3437
Conversation
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
…e-secrets-refreshment-suppport Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
…ppport Signed-off-by: George Chen <[email protected]>
|
||
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchAccessor.class); | ||
|
||
static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception"; | ||
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception"; | ||
|
||
private final ElasticsearchClient elasticsearchClient; | ||
private final ElasticsearchClientRefresher elasticsearchClientRefresher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bit of code doesn't need to know or care about the refresh aspect. It just cares that it get this dynamically from a get()
method. You can decouple this class from the refresh code by providing a Supplier<ElasticsearchClient>
here.
Then, make ElasticsearchClientRefresher
implement Supplier<ElasticsearchClient>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable Thanks for the comment. The tricky part for your strategy is OpenSearchIndexPartitionCreationSupplier where two client needs to use different models to make cat/_indices API call. I am looking into workaround but let me know any suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the generic OpenSearchClientRefresher
in OpenSearchIndexPartitionCreationSupplier
. That does not prevent you from using a Supplier<>
in this code. See my other recent comment for how to make use of the generics there.
This class (ElasticsearchAccessor
) can use the Supplier<>
to decouple this class from future changes.
There might be ways to further decouple OpenSearchIndexPartitionCreationSupplier
, but we don't need to think through those right now.
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; | ||
|
||
public interface ClusterClientFactory<Client> { | ||
PluginComponentRefresher<Client, OpenSearchSourceConfiguration> getClientRefresher(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line with my comment elsewhere, change this to:
Supplier<Client> getClientSupplier();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably would not work for OpenSearchIndexPartitionCreationSupplier. Changed into
ClientRefresher<Client> getClientRefresher();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make ClientRefresher
implement Supplier<C>
. Then you can use a simpler interface in the majority of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately there are some tight coupling in using APIs, e.g. OpenSearchIndexPartitionCreationSupplier needs the SearchAccessor to return more than Supplier. I improved it to be propagating PluginComponentRefresher<Client, OpenSearchConfiguration>
...ain/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchClientRefresher.java
Outdated
Show resolved
Hide resolved
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
public class OpenSearchClientRefresher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use something like the following to bring together some of my comments.
public class <C> OpenSearchClientRefresher
implements Supplier<C>, PluginComponentRefresher(C, OpenSearchSourceConfiguration);
|
||
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessor.class); | ||
|
||
static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception"; | ||
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception"; | ||
static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts"; | ||
|
||
private final OpenSearchClient openSearchClient; | ||
private final OpenSearchClientRefresher openSearchClientRefresher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my comment above, make this: Supplier<OpenSearchClient>
to decouple getting the client from the fact that it is refreshed.
...java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchClientRefresherTest.java
Outdated
Show resolved
Hide resolved
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
readWriteLock.readLock().lock(); | ||
try { | ||
return currentClient; | ||
} finally { | ||
readWriteLock.readLock().unlock(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these locks needed just in case we had multiple worker threads going at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. when secrets are configured there are at least two threads accessing this: pipeline worker thread and secret polling thread
@@ -149,7 +153,8 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR | |||
SearchResponse<ObjectNode> searchResponse; | |||
|
|||
try { | |||
searchResponse = elasticsearchClient.search(SearchRequest.of(request -> request | |||
searchResponse = elasticsearchClientRefresher.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this change is somewhat intrusive to the code, and we don't currently have integration tests for this source, just curious what testing you did to verify no regression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I run a smoke test with pipeline:
pipeline_configurations:
aws:
secrets:
secret1:
secret_id: my-es-secret
region: us-east-1
sts_role_arn: "arn:aws:iam::xxxx:role/ecs-test-role"
secret2:
secret_id: my-self-managed-os
region: us-east-1
sts_role_arn: "arn:aws:iam::xxxx:role/ecs-test-role"
opensearch-migration-pipeline:
source:
opensearch:
hosts: [ "https://localhost:9200" ]
username: "${{aws_secrets:secret2:username}}"
password: "${{aws_secrets:secret2:password}}"
processor:
- date:
from_time_received: true
sink:
- opensearch:
hosts: [ "https://search-test-domain-r3dk3xkywa3vqqdzdqrddq3kyq.us-east-1.es.amazonaws.com" ]
username: "${{aws_secrets:secret1:username}}"
password: "${{aws_secrets:secret1:password}}"
index: "test-migration"
Signed-off-by: George Chen <[email protected]>
@@ -59,7 +63,8 @@ private void startProcess(final OpenSearchSourceConfiguration openSearchSourceCo | |||
|
|||
final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier); | |||
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics); | |||
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory); | |||
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create( | |||
openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the code always assume that the pluginConfigObservable
is always non-null, is that right? Is OpenSearchSource without key refresh not allowed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed always non-null. If there is no secret usage, the PluginConfigPublisher notifier would never be invoked even though the PluginConfigObservable will still be instantiated and loaded with PluginConfigObserver, i.e. PluginConfigObservable::update will never be invoked. At the client level, this means there is only read access to client although there is indeed a bit overhead from acquiring the read lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still improve this code, perhaps after the 2.5 release
ENH: opensearch source secrets refreshment suppport (#3437) Signed-off-by: George Chen <[email protected]> (cherry picked from commit 0d2e491)
ENH: opensearch source secrets refreshment suppport (#3437) Signed-off-by: George Chen <[email protected]>
Description
This PR integrates secrets refreshment with OpenSearch Source clients.
Issues Resolved
Resolves #3436
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.