-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prometheus Sink draft code for issue #1744. #3103
Merged
dlvenable
merged 32 commits into
opensearch-project:main
from
mallikagogoi7:feature-prometheus-sink-draft
Dec 11, 2023
Merged
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
a166841
Prometheus Sink draft code for issue #1744.
mallikagogoi7 f6c6177
Merge branch 'main' into feature-prometheus-sink-draft
mallikagogoi7 078228b
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 bcca0a4
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 8882a83
Added metrics name in Prometheus Sink for issue #1744.
mallikagogoi7 51e7f44
Added junit test for Prometheus Sink for issue #1744.
mallikagogoi7 228991d
Added test cases and README for Prometheus Sink for issue #1744.
mallikagogoi7 7b5f1c4
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 3d3cca6
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 f575689
Incorporated review comments for issue #1744.
mallikagogoi7 c9692a8
Resolved review comments for issue #1744.
mallikagogoi7 225f931
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 cbeb4b6
Fixed Failed Response code for issue #1744.
mallikagogoi7 e9ea0c2
Fixed Failed Response code for issue #1744.
mallikagogoi7 af83962
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 cbbc511
Review Comments Resolved for issue #1744.
mallikagogoi7 7693d9d
AwsSigv4 Code added for #1744.
mallikagogoi7 8e1d7f5
AwsSigv4 Code Bug resolved for #1744.
mallikagogoi7 478a495
AwsSigv4 Code Bug Fix for #1744.
mallikagogoi7 a671ad7
README file updated for #1744.
mallikagogoi7 e4401bd
Bug fixed for #1744.
mallikagogoi7 869db0c
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 1a8cb7e
Fix is date for #1744.
mallikagogoi7 3e79395
Merge conflict resolved for issue #1744.
mallikagogoi7 c59cd3d
Review comment resolved for #1744.
mallikagogoi7 b7f95be
Added insecure changes for #1744.
mallikagogoi7 4947506
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 b0f2737
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 b3b9cec
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 1db7b44
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 d9248f8
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 9710415
Insecure Skip Verify resolved for #1744.
mallikagogoi7 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
232 changes: 232 additions & 0 deletions
232
.../src/main/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApacheInterceptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
/* | ||
* Copyright OpenSearch Contributors. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with | ||
* the License. A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR | ||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions | ||
* and limitations under the License. | ||
*/ | ||
package org.opensearch.dataprepper.aws.api; | ||
|
||
import com.amazonaws.auth.AWSCredentials; | ||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; | ||
import org.apache.hc.core5.http.HttpRequest; | ||
import org.apache.hc.core5.http.ClassicHttpRequest; | ||
import org.apache.hc.core5.http.EntityDetails; | ||
import org.apache.hc.core5.http.Header; | ||
import org.apache.hc.core5.http.NameValuePair; | ||
import org.apache.hc.core5.http.HttpHost; | ||
import org.apache.hc.core5.http.HttpRequestInterceptor; | ||
import org.apache.hc.core5.http.message.BasicHeader; | ||
import org.apache.hc.core5.http.protocol.HttpContext; | ||
import org.apache.hc.core5.net.URIBuilder; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute; | ||
import software.amazon.awssdk.core.interceptor.ExecutionAttributes; | ||
import software.amazon.awssdk.core.signer.Signer; | ||
import software.amazon.awssdk.http.SdkHttpFullRequest; | ||
import software.amazon.awssdk.http.SdkHttpMethod; | ||
import software.amazon.awssdk.regions.Region; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.TreeMap; | ||
|
||
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST; | ||
|
||
/** | ||
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer} | ||
* and {@link AwsCredentialsProvider}. | ||
*/ | ||
public final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor { | ||
|
||
/** | ||
* Constant to check content-length | ||
*/ | ||
private static final String CONTENT_LENGTH = "content-length"; | ||
/** | ||
* Constant to check Zero content length | ||
*/ | ||
private static final String ZERO_CONTENT_LENGTH = "0"; | ||
/** | ||
* Constant to check if host is the endpoint | ||
*/ | ||
private static final String HOST = "host"; | ||
|
||
/** | ||
* The service that we're connecting to. | ||
*/ | ||
private final String service; | ||
|
||
/** | ||
* The particular signer implementation. | ||
*/ | ||
private final Signer signer; | ||
|
||
/** | ||
* The source of AWS credentials for signing. | ||
*/ | ||
private final AwsCredentialsProvider awsCredentialsProvider; | ||
|
||
/** | ||
* The region signing region. | ||
*/ | ||
private final Region region; | ||
|
||
/** | ||
* | ||
* @param service service that we're connecting to | ||
* @param signer particular signer implementation | ||
* @param awsCredentialsProvider source of AWS credentials for signing | ||
* @param region signing region | ||
*/ | ||
public AwsRequestSigningApacheInterceptor(final String service, | ||
final Signer signer, | ||
final AwsCredentialsProvider awsCredentialsProvider, | ||
final Region region) { | ||
this.service = Objects.requireNonNull(service); | ||
this.signer = Objects.requireNonNull(signer); | ||
this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider); | ||
this.region = Objects.requireNonNull(region); | ||
} | ||
|
||
/** | ||
* | ||
* @param service service that we're connecting to | ||
* @param signer particular signer implementation | ||
* @param awsCredentialsProvider source of AWS credentials for signing | ||
* @param region signing region | ||
*/ | ||
public AwsRequestSigningApacheInterceptor(final String service, | ||
final Signer signer, | ||
final AwsCredentialsProvider awsCredentialsProvider, | ||
final String region) { | ||
this(service, signer, awsCredentialsProvider, Region.of(region)); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public void process(final HttpRequest request, final EntityDetails entity, final HttpContext context) | ||
throws IOException { | ||
URIBuilder uriBuilder; | ||
try { | ||
uriBuilder = new URIBuilder(request.getUri()); | ||
} catch (URISyntaxException e) { | ||
throw new IOException("Invalid URI", e); | ||
} | ||
|
||
// Copy Apache HttpRequest to AWS Request | ||
SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder() | ||
.method(SdkHttpMethod.fromValue(request.getMethod())) | ||
.uri(buildUri(context, uriBuilder)); | ||
|
||
if (request instanceof ClassicHttpRequest) { | ||
ClassicHttpRequest classicHttpRequest = | ||
(ClassicHttpRequest) request; | ||
if (classicHttpRequest.getEntity() != null) { | ||
InputStream content = classicHttpRequest.getEntity().getContent(); | ||
requestBuilder.contentStreamProvider(() -> content); | ||
} | ||
} | ||
requestBuilder.rawQueryParameters(nvpToMapParams(uriBuilder.getQueryParams())); | ||
requestBuilder.headers(headerArrayToMap(request.getHeaders())); | ||
|
||
AWSCredentials credentials = new DefaultAWSCredentialsProviderChain().getCredentials(); | ||
ExecutionAttributes attributes = new ExecutionAttributes(); | ||
attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, awsCredentialsProvider.resolveCredentials()); | ||
attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service); | ||
attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region); | ||
|
||
// Sign it | ||
SdkHttpFullRequest signedRequest = signer.sign(requestBuilder.build(), attributes); | ||
|
||
// Now copy everything back | ||
request.setHeaders(mapToHeaderArray(signedRequest.headers())); | ||
} | ||
|
||
private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IOException { | ||
try { | ||
HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST); | ||
|
||
if (host != null) { | ||
uriBuilder.setHost(host.getHostName()); | ||
uriBuilder.setScheme(host.getSchemeName()); | ||
uriBuilder.setPort(host.getPort()); | ||
} | ||
|
||
return uriBuilder.build(); | ||
} catch (URISyntaxException e) { | ||
throw new IOException("Invalid URI", e); | ||
} | ||
} | ||
|
||
/** | ||
* | ||
* @param params list of HTTP query params as NameValuePairs | ||
* @return a multimap of HTTP query params | ||
*/ | ||
private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) { | ||
Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); | ||
for (NameValuePair nvp : params) { | ||
List<String> argsList = | ||
parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>()); | ||
argsList.add(nvp.getValue()); | ||
} | ||
return parameterMap; | ||
} | ||
|
||
/** | ||
* @param headers modelled Header objects | ||
* @return a Map of header entries | ||
*/ | ||
private static Map<String, List<String>> headerArrayToMap(final Header[] headers) { | ||
Map<String, List<String>> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); | ||
for (Header header : headers) { | ||
if (!skipHeader(header)) { | ||
headersMap.put(header.getName(), headersMap | ||
.getOrDefault(header.getName(), | ||
new LinkedList<>(Collections.singletonList(header.getValue())))); | ||
} | ||
} | ||
return headersMap; | ||
} | ||
|
||
/** | ||
* @param header header line to check | ||
* @return true if the given header should be excluded when signing | ||
*/ | ||
private static boolean skipHeader(final Header header) { | ||
return (CONTENT_LENGTH.equalsIgnoreCase(header.getName()) | ||
&& ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0 | ||
|| HOST.equalsIgnoreCase(header.getName()); // Host comes from endpoint | ||
} | ||
|
||
/** | ||
* @param mapHeaders Map of header entries | ||
* @return modelled Header objects | ||
*/ | ||
private static Header[] mapToHeaderArray(final Map<String, List<String>> mapHeaders) { | ||
Header[] headers = new Header[mapHeaders.size()]; | ||
int i = 0; | ||
for (Map.Entry<String, List<String>> headerEntry : mapHeaders.entrySet()) { | ||
for (String value : headerEntry.getValue()) { | ||
headers[i++] = new BasicHeader(headerEntry.getKey(), value); | ||
} | ||
} | ||
return headers; | ||
} | ||
} |
96 changes: 96 additions & 0 deletions
96
...in/java/org/opensearch/dataprepper/plugins/certificate/s3/CertificateProviderFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.certificate.s3; | ||
|
||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; | ||
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider; | ||
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider; | ||
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; | ||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; | ||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; | ||
import software.amazon.awssdk.core.retry.RetryMode; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.acm.AcmClient; | ||
import software.amazon.awssdk.services.s3.S3Client; | ||
|
||
/** | ||
* This class consist logic for downloading the SSL certificates from S3/ACM/Local file. | ||
* | ||
*/ | ||
public class CertificateProviderFactory { | ||
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class); | ||
|
||
private final boolean useAcmCertForSSL; | ||
|
||
private final Region awsRegion; | ||
|
||
private final String acmCertificateArn; | ||
|
||
private final long acmCertIssueTimeOutMillis; | ||
|
||
private final String acmPrivateKeyPassword; | ||
|
||
private final boolean isSslCertAndKeyFileInS3; | ||
|
||
private final String sslCertificateFile; | ||
|
||
private final String sslKeyFile; | ||
|
||
public CertificateProviderFactory(final boolean useAcmCertForSSL, final Region awsRegion, final String acmCertificateArn, | ||
final long acmCertIssueTimeOutMillis, final String acmPrivateKeyPassword, | ||
final boolean isSslCertAndKeyFileInS3, final String sslCertificateFile, final String sslKeyFile) { | ||
this.useAcmCertForSSL = useAcmCertForSSL; | ||
this.awsRegion = awsRegion; | ||
this.acmCertificateArn = acmCertificateArn; | ||
this.acmCertIssueTimeOutMillis = acmCertIssueTimeOutMillis; | ||
this.acmPrivateKeyPassword = acmPrivateKeyPassword; | ||
this.isSslCertAndKeyFileInS3 = isSslCertAndKeyFileInS3; | ||
this.sslCertificateFile = sslCertificateFile; | ||
this.sslKeyFile = sslKeyFile; | ||
} | ||
|
||
/** | ||
* This method consist logic for downloading the SSL certificates from S3/ACM/Local file. | ||
* @return CertificateProvider | ||
*/ | ||
public CertificateProvider getCertificateProvider() { | ||
if (useAcmCertForSSL) { | ||
LOG.info("Using ACM certificate and private key for SSL/TLS."); | ||
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() | ||
.addCredentialsProvider(DefaultCredentialsProvider.create()).build(); | ||
final ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder() | ||
.retryPolicy(RetryMode.STANDARD) | ||
.build(); | ||
|
||
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); | ||
|
||
final AcmClient awsCertificateManager = AcmClient.builder() | ||
.region(awsRegion) | ||
.credentialsProvider(credentialsProvider) | ||
.overrideConfiguration(clientConfig) | ||
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))) | ||
.build(); | ||
|
||
return new ACMCertificateProvider(awsCertificateManager, acmCertificateArn, acmCertIssueTimeOutMillis, acmPrivateKeyPassword); | ||
} else if (isSslCertAndKeyFileInS3) { | ||
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS."); | ||
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() | ||
.addCredentialsProvider(DefaultCredentialsProvider.create()).build(); | ||
final S3Client s3Client = S3Client.builder() | ||
.region(awsRegion) | ||
.credentialsProvider(credentialsProvider) | ||
.build(); | ||
return new S3CertificateProvider(s3Client, sslCertificateFile, sslKeyFile); | ||
} else { | ||
LOG.info("Using local file system to get certificate and private key for SSL/TLS."); | ||
return new FileCertificateProvider(sslCertificateFile, sslKeyFile); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This project has 100% coverage and can retain that. Dropping this to 10% will prevent us from ensuring that code coverage exists within the project. We should test the
AwsRequestSigningApacheInterceptor
. I believe that a different PR may be doing something similar here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable Resolved