Skip to content

Commit

Permalink
HttpSink Plugin Functionality for #874. (#3036)
Browse files Browse the repository at this point in the history
* HttpSink Plugin Functionality for #874.
Signed-off-by: mallikagogoi7 <[email protected]>

* Fixed review comments for #874.
Signed-off-by: mallikagogoi7 <[email protected]>

* Fixes for #874.
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 authored Aug 1, 2023
1 parent 5e3765b commit f236af9
Show file tree
Hide file tree
Showing 14 changed files with 1,074 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
/**
* Check threshold limits.
*/
public class ThresholdCheck {
public class ThresholdValidator {

private ThresholdCheck() {
private ThresholdValidator() {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ThresholdCheckTest {
class ThresholdValidatorTest {

private Buffer inMemoryBuffer;

Expand All @@ -34,7 +34,7 @@ void test_exceedThreshold_true_dueTo_maxEvents_is_less_than_buffered_event_count
final int maxEvents = 95;
final ByteCount maxBytes = ByteCount.parse("50kb");
final long maxCollectionDuration = 15;
boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents,
boolean isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents,
maxBytes, maxCollectionDuration);
assertTrue(isThresholdExceed, "Threshold not exceeded");
}
Expand All @@ -44,7 +44,7 @@ void test_exceedThreshold_false_dueTo_maxEvents_is_greater_than_buffered_event_c
final int maxEvents = 105;
final ByteCount maxBytes = ByteCount.parse("50mb");
final long maxCollectionDuration = 50;
boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes,
boolean isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes,
maxCollectionDuration);
assertFalse(isThresholdExceed, "Threshold exceeded");
}
Expand All @@ -54,7 +54,7 @@ void test_exceedThreshold_ture_dueTo_maxBytes_is_less_than_buffered_byte_count()
final int maxEvents = 500;
final ByteCount maxBytes = ByteCount.parse("1b");
final long maxCollectionDuration = 15;
boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes,
boolean isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes,
maxCollectionDuration);
assertTrue(isThresholdExceed, "Threshold not exceeded");
}
Expand All @@ -64,7 +64,7 @@ void test_exceedThreshold_false_dueTo_maxBytes_is_greater_than_buffered_byte_cou
final int maxEvents = 500;
final ByteCount maxBytes = ByteCount.parse("8mb");
final long maxCollectionDuration = 15;
boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents,
boolean isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents,
maxBytes, maxCollectionDuration);
assertFalse(isThresholdExceed, "Threshold exceeded");
}
Expand All @@ -81,7 +81,7 @@ void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_less_than_buffered
synchronized (this) {
while (inMemoryBuffer.getEventCount() < 100) {
inMemoryBuffer.writeEvent(generateByteArray());
isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents,
isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents,
maxBytes, maxCollectionDuration);
if (isThresholdExceed) {
break;
Expand All @@ -105,7 +105,7 @@ void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_greater_than_buffe
synchronized (this) {
while (inMemoryBuffer.getEventCount() < 100) {
inMemoryBuffer.writeEvent(generateByteArray());
isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer,
isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer,
maxEvents, maxBytes, maxCollectionDuration);
if (isThresholdExceed) {
break;
Expand Down
13 changes: 12 additions & 1 deletion data-prepper-plugins/http-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@

dependencies {
implementation project(':data-prepper-api')
implementation libs.armeria.core
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:acm'
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2'
implementation 'software.amazon.awssdk:auth'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation project(':data-prepper-plugins:failures-common')
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2'
testImplementation project(':data-prepper-test-common')
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Copyright OpenSearch Contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.opensearch.dataprepper.plugins.sink;

import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.io.entity.BasicHttpEntity;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.net.URIBuilder;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.regions.Region;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;

import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

/**
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}.
*/
public final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {

/**
* Constant to check content-length
*/
private static final String CONTENT_LENGTH = "content-length";
/**
* Constant to check Zero content length
*/
private static final String ZERO_CONTENT_LENGTH = "0";
/**
* Constant to check if host is the endpoint
*/
private static final String HOST = "host";

/**
* The service that we're connecting to.
*/
private final String service;

/**
* The particular signer implementation.
*/
private final Signer signer;

/**
* The source of AWS credentials for signing.
*/
private final AwsCredentialsProvider awsCredentialsProvider;

/**
* The region signing region.
*/
private final Region region;

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final Region region) {
this.service = Objects.requireNonNull(service);
this.signer = Objects.requireNonNull(signer);
this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider);
this.region = Objects.requireNonNull(region);
}

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final String region) {
this(service, signer, awsCredentialsProvider, Region.of(region));
}

/**
* {@inheritDoc}
*/
@Override
public void process(final HttpRequest request, final EntityDetails entity, final HttpContext context)
throws IOException {
URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getUri());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
}

// Copy Apache HttpRequest to AWS Request
SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.fromValue(request.getMethod()))
.uri(buildUri(context, uriBuilder));

if (request instanceof ClassicHttpRequest) {
ClassicHttpRequest classicHttpRequest =
(ClassicHttpRequest) request;
if (classicHttpRequest.getEntity() != null) {
InputStream content = classicHttpRequest.getEntity().getContent();
requestBuilder.contentStreamProvider(() -> content);
}
}
requestBuilder.rawQueryParameters(nvpToMapParams(uriBuilder.getQueryParams()));
requestBuilder.headers(headerArrayToMap(request.getHeaders()));

ExecutionAttributes attributes = new ExecutionAttributes();
attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, awsCredentialsProvider.resolveCredentials());
attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service);
attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region);

// Sign it
SdkHttpFullRequest signedRequest = signer.sign(requestBuilder.build(), attributes);

// Now copy everything back
request.setHeaders(mapToHeaderArray(signedRequest.headers()));
if (request instanceof ClassicHttpRequest) {
ClassicHttpRequest classicHttpRequest =
(ClassicHttpRequest) request;
if (classicHttpRequest.getEntity() != null) {
HttpEntity basicHttpEntity = new BasicHttpEntity(signedRequest.contentStreamProvider()
.orElseThrow(() -> new IllegalStateException("There must be content"))
.newStream(), ContentType.APPLICATION_JSON);
classicHttpRequest.setEntity(basicHttpEntity);
}
}
}

private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IOException {
try {
HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);

if (host != null) {
uriBuilder.setHost(host.getHostName());
uriBuilder.setScheme(host.getSchemeName());
uriBuilder.setPort(host.getPort());
}

return uriBuilder.build();
} catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
}
}

/**
*
* @param params list of HTTP query params as NameValuePairs
* @return a multimap of HTTP query params
*/
private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) {
Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (NameValuePair nvp : params) {
List<String> argsList =
parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>());
argsList.add(nvp.getValue());
}
return parameterMap;
}

/**
* @param headers modelled Header objects
* @return a Map of header entries
*/
private static Map<String, List<String>> headerArrayToMap(final Header[] headers) {
Map<String, List<String>> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Header header : headers) {
if (!skipHeader(header)) {
headersMap.put(header.getName(), headersMap
.getOrDefault(header.getName(),
new LinkedList<>(Collections.singletonList(header.getValue()))));
}
}
return headersMap;
}

/**
* @param header header line to check
* @return true if the given header should be excluded when signing
*/
private static boolean skipHeader(final Header header) {
return (CONTENT_LENGTH.equalsIgnoreCase(header.getName())
&& ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0
|| HOST.equalsIgnoreCase(header.getName()); // Host comes from endpoint
}

/**
* @param mapHeaders Map of header entries
* @return modelled Header objects
*/
private static Header[] mapToHeaderArray(final Map<String, List<String>> mapHeaders) {
Header[] headers = new Header[mapHeaders.size()];
int i = 0;
for (Map.Entry<String, List<String>> headerEntry : mapHeaders.entrySet()) {
for (String value : headerEntry.getValue()) {
headers[i++] = new BasicHeader(headerEntry.getKey(), value);
}
}
return headers;
}
}
Loading

0 comments on commit f236af9

Please sign in to comment.