Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Sink draft code for issue #1744. #3103

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a166841
Prometheus Sink draft code for issue #1744.
mallikagogoi7 Aug 2, 2023
f6c6177
Merge branch 'main' into feature-prometheus-sink-draft
mallikagogoi7 Aug 4, 2023
078228b
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 7, 2023
bcca0a4
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 8, 2023
8882a83
Added metrics name in Prometheus Sink for issue #1744.
mallikagogoi7 Aug 8, 2023
51e7f44
Added junit test for Prometheus Sink for issue #1744.
mallikagogoi7 Aug 9, 2023
228991d
Added test cases and README for Prometheus Sink for issue #1744.
mallikagogoi7 Aug 10, 2023
7b5f1c4
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 10, 2023
3d3cca6
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 17, 2023
f575689
Incorporated review comments for issue #1744.
mallikagogoi7 Aug 17, 2023
c9692a8
Resolved review comments for issue #1744.
mallikagogoi7 Aug 18, 2023
225f931
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 21, 2023
cbeb4b6
Fixed Failed Response code for issue #1744.
mallikagogoi7 Aug 21, 2023
e9ea0c2
Fixed Failed Response code for issue #1744.
mallikagogoi7 Aug 21, 2023
af83962
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 25, 2023
cbbc511
Review Comments Resolved for issue #1744.
mallikagogoi7 Aug 25, 2023
7693d9d
AwsSigv4 Code added for #1744.
mallikagogoi7 Aug 28, 2023
8e1d7f5
AwsSigv4 Code Bug resolved for #1744.
mallikagogoi7 Aug 28, 2023
478a495
AwsSigv4 Code Bug Fix for #1744.
mallikagogoi7 Aug 28, 2023
a671ad7
README file updated for #1744.
mallikagogoi7 Aug 28, 2023
e4401bd
Bug fixed for #1744.
mallikagogoi7 Aug 31, 2023
869db0c
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Aug 31, 2023
1a8cb7e
Fix is date for #1744.
mallikagogoi7 Sep 1, 2023
3e79395
Merge conflict resolved for issue #1744.
mallikagogoi7 Sep 8, 2023
c59cd3d
Review comment resolved for #1744.
mallikagogoi7 Sep 8, 2023
b7f95be
Added insecure changes for #1744.
mallikagogoi7 Sep 12, 2023
4947506
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Sep 12, 2023
b0f2737
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Sep 18, 2023
b3b9cec
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Sep 20, 2023
1db7b44
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Oct 3, 2023
d9248f8
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
mallikagogoi7 Oct 11, 2023
9710415
Insecure Skip Verify resolved for #1744.
mallikagogoi7 Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data-prepper-plugins/aws-plugin-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
dependencies {
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:apache-client'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2'
implementation 'com.amazonaws:aws-java-sdk-sts:1.12.395'
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright OpenSearch Contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.opensearch.dataprepper.aws.api;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.net.URIBuilder;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.regions.Region;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;

import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

/**
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}.
*/
public final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {

/**
* Constant to check content-length
*/
private static final String CONTENT_LENGTH = "content-length";
/**
* Constant to check Zero content length
*/
private static final String ZERO_CONTENT_LENGTH = "0";
/**
* Constant to check if host is the endpoint
*/
private static final String HOST = "host";

/**
* The service that we're connecting to.
*/
private final String service;

/**
* The particular signer implementation.
*/
private final Signer signer;

/**
* The source of AWS credentials for signing.
*/
private final AwsCredentialsProvider awsCredentialsProvider;

/**
* The region signing region.
*/
private final Region region;

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final Region region) {
this.service = Objects.requireNonNull(service);
this.signer = Objects.requireNonNull(signer);
this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider);
this.region = Objects.requireNonNull(region);
}

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final String region) {
this(service, signer, awsCredentialsProvider, Region.of(region));
}

/**
* {@inheritDoc}
*/
@Override
public void process(final HttpRequest request, final EntityDetails entity, final HttpContext context)
throws IOException {
URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getUri());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
}

// Copy Apache HttpRequest to AWS Request
SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.fromValue(request.getMethod()))
.uri(buildUri(context, uriBuilder));

if (request instanceof ClassicHttpRequest) {
ClassicHttpRequest classicHttpRequest =
(ClassicHttpRequest) request;
if (classicHttpRequest.getEntity() != null) {
InputStream content = classicHttpRequest.getEntity().getContent();
requestBuilder.contentStreamProvider(() -> content);
}
}
requestBuilder.rawQueryParameters(nvpToMapParams(uriBuilder.getQueryParams()));
requestBuilder.headers(headerArrayToMap(request.getHeaders()));

AWSCredentials credentials = new DefaultAWSCredentialsProviderChain().getCredentials();
ExecutionAttributes attributes = new ExecutionAttributes();
attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, awsCredentialsProvider.resolveCredentials());
attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service);
attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region);

// Sign it
SdkHttpFullRequest signedRequest = signer.sign(requestBuilder.build(), attributes);

// Now copy everything back
request.setHeaders(mapToHeaderArray(signedRequest.headers()));
}

private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IOException {
try {
HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);

if (host != null) {
uriBuilder.setHost(host.getHostName());
uriBuilder.setScheme(host.getSchemeName());
uriBuilder.setPort(host.getPort());
}

return uriBuilder.build();
} catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
}
}

/**
*
* @param params list of HTTP query params as NameValuePairs
* @return a multimap of HTTP query params
*/
private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) {
Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (NameValuePair nvp : params) {
List<String> argsList =
parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>());
argsList.add(nvp.getValue());
}
return parameterMap;
}

/**
* @param headers modelled Header objects
* @return a Map of header entries
*/
private static Map<String, List<String>> headerArrayToMap(final Header[] headers) {
Map<String, List<String>> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Header header : headers) {
if (!skipHeader(header)) {
headersMap.put(header.getName(), headersMap
.getOrDefault(header.getName(),
new LinkedList<>(Collections.singletonList(header.getValue()))));
}
}
return headersMap;
}

/**
* @param header header line to check
* @return true if the given header should be excluded when signing
*/
private static boolean skipHeader(final Header header) {
return (CONTENT_LENGTH.equalsIgnoreCase(header.getName())
&& ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0
|| HOST.equalsIgnoreCase(header.getName()); // Host comes from endpoint
}

/**
* @param mapHeaders Map of header entries
* @return modelled Header objects
*/
private static Header[] mapToHeaderArray(final Map<String, List<String>> mapHeaders) {
Header[] headers = new Header[mapHeaders.size()];
int i = 0;
for (Map.Entry<String, List<String>> headerEntry : mapHeaders.entrySet()) {
for (String value : headerEntry.getValue()) {
headers[i++] = new BasicHeader(headerEntry.getKey(), value);
}
}
return headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.certificate.s3;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider;
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.acm.AcmClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
* This class consist logic for downloading the SSL certificates from S3/ACM/Local file.
*
*/
public class CertificateProviderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class);

private final boolean useAcmCertForSSL;

private final Region awsRegion;

private final String acmCertificateArn;

private final long acmCertIssueTimeOutMillis;

private final String acmPrivateKeyPassword;

private final boolean isSslCertAndKeyFileInS3;

private final String sslCertificateFile;

private final String sslKeyFile;

public CertificateProviderFactory(final boolean useAcmCertForSSL, final Region awsRegion, final String acmCertificateArn,
final long acmCertIssueTimeOutMillis, final String acmPrivateKeyPassword,
final boolean isSslCertAndKeyFileInS3, final String sslCertificateFile, final String sslKeyFile) {
this.useAcmCertForSSL = useAcmCertForSSL;
this.awsRegion = awsRegion;
this.acmCertificateArn = acmCertificateArn;
this.acmCertIssueTimeOutMillis = acmCertIssueTimeOutMillis;
this.acmPrivateKeyPassword = acmPrivateKeyPassword;
this.isSslCertAndKeyFileInS3 = isSslCertAndKeyFileInS3;
this.sslCertificateFile = sslCertificateFile;
this.sslKeyFile = sslKeyFile;
}

/**
* This method consist logic for downloading the SSL certificates from S3/ACM/Local file.
* @return CertificateProvider
*/
public CertificateProvider getCertificateProvider() {
if (useAcmCertForSSL) {
LOG.info("Using ACM certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
final ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder()
.retryPolicy(RetryMode.STANDARD)
.build();

final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");

final AcmClient awsCertificateManager = AcmClient.builder()
.region(awsRegion)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(clientConfig)
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)))
.build();

return new ACMCertificateProvider(awsCertificateManager, acmCertificateArn, acmCertIssueTimeOutMillis, acmPrivateKeyPassword);
} else if (isSslCertAndKeyFileInS3) {
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
final S3Client s3Client = S3Client.builder()
.region(awsRegion)
.credentialsProvider(credentialsProvider)
.build();
return new S3CertificateProvider(s3Client, sslCertificateFile, sslKeyFile);
} else {
LOG.info("Using local file system to get certificate and private key for SSL/TLS.");
return new FileCertificateProvider(sslCertificateFile, sslKeyFile);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.certificate.s3;

import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider;
import software.amazon.awssdk.regions.Region;

import static org.hamcrest.MatcherAssert.assertThat;

class CertificateProviderFactoryTest {
private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile();
private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile();

private CertificateProviderFactory certificateProviderFactory;

@Test
void getCertificateProviderFileCertificateProviderSuccess() {
certificateProviderFactory = new CertificateProviderFactory(false, Region.of("us-east-1"),
"arn:aws:acm:us-east-1:account:certificate/1234-567-856456", 5L, "test", false, TEST_SSL_CERTIFICATE_FILE, TEST_SSL_KEY_FILE);

final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();

assertThat(certificateProvider, IsInstanceOf.instanceOf(FileCertificateProvider.class));
}

@Test
void getCertificateProviderS3ProviderSuccess() {

certificateProviderFactory = new CertificateProviderFactory(false, Region.of("us-east-1"),
"arn:aws:acm:us-east-1:account:certificate/1234-567-856456", 5L, "test", true, TEST_SSL_CERTIFICATE_FILE, TEST_SSL_KEY_FILE);

final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();

assertThat(certificateProvider, IsInstanceOf.instanceOf(S3CertificateProvider.class));
}

@Test
void getCertificateProviderAcmProviderSuccess() {
certificateProviderFactory = new CertificateProviderFactory(true, Region.of("us-east-1"),
"arn:aws:acm:us-east-1:account:certificate/1234-567-856456", 5L, "test", false, TEST_SSL_CERTIFICATE_FILE, TEST_SSL_KEY_FILE);
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();

assertThat(certificateProvider, IsInstanceOf.instanceOf(ACMCertificateProvider.class));
}
}
Loading
Loading