Skip to content

Commit

Permalink
Incorporated review comments for issue #1744.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed Aug 17, 2023
1 parent 3d3cca6 commit f575689
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 263 deletions.
3 changes: 2 additions & 1 deletion data-prepper-plugins/aws-plugin-api/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

dependencies {
implementation 'software.amazon.awssdk:auth'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2'
}

test {
Expand All @@ -12,7 +13,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 1.0
minimum = 0.1
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.opensearch.dataprepper.plugins.sink.prometheus;
package org.opensearch.dataprepper.aws.api;

import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.ClassicHttpRequest;
Expand Down Expand Up @@ -45,8 +45,6 @@
import java.util.Objects;
import java.util.TreeMap;

import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

/**
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}.
Expand All @@ -66,6 +64,11 @@ public final class AwsRequestSigningApacheInterceptor implements HttpRequestInte
*/
private static final String HOST = "host";

/**
* Attribute name of a HttpHost object that represents the connection target.
*/
private static final String HTTP_TARGET_HOST = "http.target_host";

/**
* The service that we're connecting to.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.prometheus.certificate;
package org.opensearch.dataprepper.plugins.certificate.s3;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.s3.S3CertificateProvider;
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
import org.opensearch.dataprepper.plugins.sink.prometheus.configuration.PrometheusSinkConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.acm.AcmClient;
import software.amazon.awssdk.services.s3.S3Client;

Expand All @@ -28,17 +27,41 @@
public class CertificateProviderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class);

final PrometheusSinkConfiguration prometheusSinkConfiguration;
public CertificateProviderFactory(final PrometheusSinkConfiguration prometheusSinkConfiguration) {
this.prometheusSinkConfiguration = prometheusSinkConfiguration;
private final boolean useAcmCertForSSL;

private final Region awsRegion;

private final String acmCertificateArn;

private final long acmCertIssueTimeOutMillis;

private final String acmPrivateKeyPassword;

private final boolean isSslCertAndKeyFileInS3;

private final String sslCertificateFile;

private final String sslKeyFile;

public CertificateProviderFactory(final boolean useAcmCertForSSL, final Region awsRegion, final String acmCertificateArn,
final long acmCertIssueTimeOutMillis, final String acmPrivateKeyPassword,
final boolean isSslCertAndKeyFileInS3, final String sslCertificateFile, final String sslKeyFile) {
this.useAcmCertForSSL = useAcmCertForSSL;
this.awsRegion = awsRegion;
this.acmCertificateArn = acmCertificateArn;
this.acmCertIssueTimeOutMillis = acmCertIssueTimeOutMillis;
this.acmPrivateKeyPassword = acmPrivateKeyPassword;
this.isSslCertAndKeyFileInS3 = isSslCertAndKeyFileInS3;
this.sslCertificateFile = sslCertificateFile;
this.sslKeyFile = sslKeyFile;
}

/**
* This method consist logic for downloading the SSL certificates from S3/ACM/Local file.
* @return CertificateProvider
*/
public CertificateProvider getCertificateProvider() {
if (prometheusSinkConfiguration.useAcmCertForSSL()) {
if (useAcmCertForSSL) {
LOG.info("Using ACM certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
Expand All @@ -49,28 +72,25 @@ public CertificateProvider getCertificateProvider() {
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");

final AcmClient awsCertificateManager = AcmClient.builder()
.region(prometheusSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion())
.region(awsRegion)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(clientConfig)
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)))
.build();

return new ACMCertificateProvider(awsCertificateManager, prometheusSinkConfiguration.getAcmCertificateArn(),
prometheusSinkConfiguration.getAcmCertIssueTimeOutMillis(), prometheusSinkConfiguration.getAcmPrivateKeyPassword());
} else if (prometheusSinkConfiguration.isSslCertAndKeyFileInS3()) {
return new ACMCertificateProvider(awsCertificateManager, acmCertificateArn, acmCertIssueTimeOutMillis, acmPrivateKeyPassword);
} else if (isSslCertAndKeyFileInS3) {
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
final S3Client s3Client = S3Client.builder()
.region(prometheusSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion())
.region(awsRegion)
.credentialsProvider(credentialsProvider)
.build();
return new S3CertificateProvider(s3Client,
prometheusSinkConfiguration.getSslCertificateFile(),
prometheusSinkConfiguration.getSslKeyFile());
return new S3CertificateProvider(s3Client, sslCertificateFile, sslKeyFile);
} else {
LOG.info("Using local file system to get certificate and private key for SSL/TLS.");
return new FileCertificateProvider(prometheusSinkConfiguration.getSslCertificateFile(), prometheusSinkConfiguration.getSslKeyFile());
return new FileCertificateProvider(sslCertificateFile, sslKeyFile);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.certificate.s3;

import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider;
import software.amazon.awssdk.regions.Region;

import static org.hamcrest.MatcherAssert.assertThat;

class CertificateProviderFactoryTest {
private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile();
private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile();

private CertificateProviderFactory certificateProviderFactory;

@Test
void getCertificateProviderFileCertificateProviderSuccess() {
certificateProviderFactory = new CertificateProviderFactory(false, Region.of("us-east-1"),
"arn:aws:acm:us-east-1:account:certificate/1234-567-856456", 5L, "test", false, TEST_SSL_CERTIFICATE_FILE, TEST_SSL_KEY_FILE);

final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();

assertThat(certificateProvider, IsInstanceOf.instanceOf(FileCertificateProvider.class));
}

@Test
void getCertificateProviderS3ProviderSuccess() {

certificateProviderFactory = new CertificateProviderFactory(false, Region.of("us-east-1"),
"arn:aws:acm:us-east-1:account:certificate/1234-567-856456", 5L, "test", true, TEST_SSL_CERTIFICATE_FILE, TEST_SSL_KEY_FILE);

final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();

assertThat(certificateProvider, IsInstanceOf.instanceOf(S3CertificateProvider.class));
}

@Test
void getCertificateProviderAcmProviderSuccess() {
certificateProviderFactory = new CertificateProviderFactory(true, Region.of("us-east-1"),
"arn:aws:acm:us-east-1:account:certificate/1234-567-856456", 5L, "test", false, TEST_SSL_CERTIFICATE_FILE, TEST_SSL_KEY_FILE);
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();

assertThat(certificateProvider, IsInstanceOf.instanceOf(ACMCertificateProvider.class));
}
}
7 changes: 7 additions & 0 deletions data-prepper-plugins/prometheus-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,10 @@ This plugin is compatible with Java 8. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:prometheus-sink:integrationTest -Dtests.prometheus.sink.http.endpoint=<http-endpoint>
```
1 change: 1 addition & 0 deletions data-prepper-plugins/prometheus-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:acm'
implementation 'com.github.scribejava:scribejava-core:8.3.3'
testImplementation project(':data-prepper-test-common')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ public class PrometheusSinkServiceIT {
" url: {0}\n" +
" http_method: POST\n" +
" auth_type: {1}\n" +
" ssl: false\n" +
" aws:\n" +
" region: {2}\n" +
" sts_role_arn: {3}\n";
" ssl: false\n";

private PrometheusSinkConfiguration prometheusSinkConfiguration;

Expand All @@ -65,7 +62,7 @@ public class PrometheusSinkServiceIT {
@BeforeEach
void setUp() throws JsonProcessingException{
this.urlString = System.getProperty("tests.prometheus.sink.http.endpoint");
String[] values = { urlString,"unauthenticated","ap-south-1","arn:aws:iam::524239988944:role/app-test" };
String[] values = { urlString,"unauthenticated"};
final String configYaml = MessageFormat.format(config, values);
this.prometheusSinkConfiguration = objectMapper.readValue(configYaml, PrometheusSinkConfiguration.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,66 @@
*/
package org.opensearch.dataprepper.plugins.sink.prometheus;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.StringEntity;
import com.github.scribejava.core.builder.api.DefaultApi20;
import com.github.scribejava.core.model.OAuth2AccessToken;
import com.github.scribejava.core.oauth.OAuth20Service;
import com.github.scribejava.core.builder.ServiceBuilder;
import org.opensearch.dataprepper.plugins.sink.prometheus.configuration.BearerTokenOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.Map;

public class OAuthAccessTokenManager {

public static final String BASIC = "Basic ";

public static final String BEARER = "Bearer ";

public static final String APPLICATION_X_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded";
public static final String EXP = "exp";
public static final String ACCESS_TOKEN = "access_token";

public static final String REFRESH_TOKEN = "refresh_token";

private final ObjectMapper objectMapper;

private HttpClientBuilder httpClientBuilder;


public OAuthAccessTokenManager(final HttpClientBuilder httpClientBuilder){
this.httpClientBuilder = httpClientBuilder;
this.objectMapper = new ObjectMapper();
}
private static final Logger LOG = LoggerFactory.getLogger(OAuthAccessTokenManager.class);

public String getAccessToken(final BearerTokenOptions bearerTokenOptions) {
HttpPost request = new HttpPost(bearerTokenOptions.getTokenURL());
request.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_X_WWW_FORM_URLENCODED);
request.setHeader(HttpHeaders.AUTHORIZATION, BASIC + base64Encode(bearerTokenOptions.getClientId() + ":" + bearerTokenOptions.getClientSecret()));
String requestBody = "grant_type=" + bearerTokenOptions.getGrantType() +"&refresh_token"+bearerTokenOptions.getRefreshToken()+"&scope=" + bearerTokenOptions.getScope();
request.setEntity(new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED));
Map<String,String> accessTokenMap;
OAuth20Service service = getOAuth20ServiceObj(bearerTokenOptions);
OAuth2AccessToken accessTokenObj = null;
try {
ClassicHttpResponse response = (ClassicHttpResponse)httpClientBuilder.build().execute(request);
accessTokenMap = objectMapper.readValue(response.getEntity().getContent(),Map.class);
} catch (IOException e) {
throw new RuntimeException(e);
if(bearerTokenOptions.getRefreshToken() != null) {
accessTokenObj = new OAuth2AccessToken(bearerTokenOptions.getAccessToken(), bearerTokenOptions.getRefreshToken());
accessTokenObj = service.refreshAccessToken(accessTokenObj.getRefreshToken());

}else {
accessTokenObj = service.getAccessTokenClientCredentialsGrant();
}
bearerTokenOptions.setRefreshToken(accessTokenObj.getRefreshToken());
bearerTokenOptions.setAccessToken(accessTokenObj.getAccessToken());
bearerTokenOptions.setTokenExpired(accessTokenObj.getExpiresIn());
}catch (Exception e) {
LOG.info("Exception : "+ e.getMessage() );
}
bearerTokenOptions.setRefreshToken(accessTokenMap.get(REFRESH_TOKEN));
return BEARER + accessTokenMap.get(ACCESS_TOKEN);
return bearerTokenOptions.getAccessToken();
}

private static String base64Encode(String value) {
return java.util.Base64.getEncoder().encodeToString(value.getBytes());
}

public boolean isTokenExpired(final String token){
Base64.Decoder decoder = Base64.getUrlDecoder();
String[] chunks = token.substring(6).split("\\.");
final Map<String,Object> tokenDetails;
try {
ObjectMapper objectMapper = new ObjectMapper();
tokenDetails = objectMapper.readValue(new String(decoder.decode(chunks[1])), Map.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
final String expTime = String.valueOf(tokenDetails.get(EXP));
OffsetDateTime accessTokenExpTimeStamp = Instant.ofEpochMilli(Long.valueOf(expTime ) * 1000l).atOffset(ZoneOffset.UTC);
public boolean isTokenExpired(final Integer tokenExpired){
final Instant systemCurrentTimeStamp = Instant.now().atOffset(ZoneOffset.UTC).toInstant();
if(systemCurrentTimeStamp.compareTo(accessTokenExpTimeStamp.toInstant())>=0) {
Instant accessTokenExpTimeStamp = systemCurrentTimeStamp.plusSeconds(tokenExpired);
if(systemCurrentTimeStamp.compareTo(accessTokenExpTimeStamp)>=0) {
return true;
}
return false;
}

private OAuth20Service getOAuth20ServiceObj(BearerTokenOptions bearerTokenOptions){
return new ServiceBuilder(bearerTokenOptions.getClientId())
.apiSecret(bearerTokenOptions.getClientSecret())
.defaultScope(bearerTokenOptions.getScope())
.build(new DefaultApi20() {
@Override
public String getAccessTokenEndpoint() {
return bearerTokenOptions.getTokenUrl();
}

@Override
protected String getAuthorizationBaseUrl() {
return bearerTokenOptions.getTokenUrl();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.hc.core5.ssl.TrustStrategy;
import org.apache.hc.core5.util.Timeout;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.s3.CertificateProviderFactory;
import org.opensearch.dataprepper.plugins.sink.prometheus.configuration.PrometheusSinkConfiguration;

import javax.net.ssl.SSLContext;
Expand Down
Loading

0 comments on commit f575689

Please sign in to comment.