Skip to content

Commit

Permalink
Adds AWS SigV4 signing in the Gatling performance tests. Also moves t…
Browse files Browse the repository at this point in the history
…he Gatling setup into constructors rather than static initializers. Resolves #3308.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Sep 7, 2023
1 parent 91ee61b commit 393697f
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 1 deletion.
1 change: 1 addition & 0 deletions performance-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ repositories {
}

dependencies {
gatlingImplementation 'software.amazon.awssdk:auth:2.20.67'
implementation 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly testLibs.junit.engine
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class FixedClientSimulation extends Simulation {
.during(duration)
.on(Chain.sendApacheCommonLogPostRequest("Post logs with large batch", largeBatchSize));

public FixedClientSimulation()
{
setUp(fixedScenario.injectOpen(CoreDsl.atOnceUsers(users)))
.protocols(Protocol.httpProtocol())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class HttpStaticLoadSimulation extends Simulation {
.during(testDuration)
.on(Chain.sendApacheCommonLogPostRequest("Average log post request", 20));

public HttpStaticLoadSimulation()
{
setUp(httpStaticLoad.injectOpen(
CoreDsl.rampUsers(10).during(Duration.ofSeconds(10)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class HttpsStaticLoadSimulation extends Simulation {
.during(testDuration)
.on(Chain.sendApacheCommonLogPostRequest("Average log post request", 20));

public HttpsStaticLoadSimulation()
{
setUp(httpStaticLoad.injectOpen(
CoreDsl.rampUsers(10).during(Duration.ofSeconds(10)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class RampUpSimulation extends Simulation {
.during(peakLoadTime)
.on(Chain.sendApacheCommonLogPostRequest("Post logs with large batch", largeBatchSize));

public RampUpSimulation()
{
setUp(
rampUpScenario.injectOpen(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ public class SingleRequestSimulation extends Simulation {
.post(PathTarget.getPath())
.body(CoreDsl.ElFileBody("bodies/singleLog.json"))
.asJson()
.check(HttpDsl.status().is(200), CoreDsl.responseTimeInMillis().lt(200))
.check(HttpDsl.status().is(200), CoreDsl.responseTimeInMillis().lt(500))
);

ScenarioBuilder basicScenario = CoreDsl.scenario("Post static json log file")
.exec(sendSingleLogFile);


public SingleRequestSimulation()
{

setUp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class SlowBurnSimulation extends Simulation {
.forever()
.on(Chain.sendApacheCommonLogPostRequest("Post logs with large batch", largeBatchSize));

public SlowBurnSimulation()
{
setUp(
rampUpScenario.injectOpen(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private static PopulationBuilder runScenarioWithTargetRps(final ScenarioBuilder
).protocols(Protocol.httpProtocol());
}

public TargetRpsSimulation()
{
setUp(
runScenarioWithTargetRps(smallBatchScenario, 400),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private static Map<String, String> logObject(final String logValue) {
.during(testDuration)
.on(sendMultipleGrokPatterns);

public VariousGrokPatternsSimulation()
{
setUp(sendMultipleGrokPatternsScenario.injectOpen(
CoreDsl.rampUsers(rampUsers).during(rampUpTime)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package org.opensearch.dataprepper.test.performance.tools;

import io.gatling.http.client.Request;
import io.netty.handler.codec.http.HttpHeaders;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.regions.Region;

import java.io.ByteArrayInputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;

public class AwsRequestSigner implements Consumer<Request> {
static final String SIGNER_NAME = "aws_sigv4";

/**
* Constant to check content-length
*/
private static final String CONTENT_LENGTH = "content-length";
/**
* Constant to check Zero content length
*/
private static final String ZERO_CONTENT_LENGTH = "0";
/**
* Constant to check if host is the endpoint
*/
private static final String HOST = "host";

private static final String REGION_PROPERTY = "aws_region";
private static final String SERVICE_NAME_PROPERTY = "aws_service";


private final Aws4Signer awsSigner;
private final AwsCredentialsProvider credentialsProvider;
private final Region region;
private final String service;

public AwsRequestSigner() {
region = getRequiredProperty(REGION_PROPERTY, Region::of);
service = getRequiredProperty(SERVICE_NAME_PROPERTY, Function.identity());

awsSigner = Aws4Signer.create();
credentialsProvider = DefaultCredentialsProvider.create();
}

private static <T> T getRequiredProperty(String propertyName, Function<String, T> transform) {
String inputString = System.getProperty(propertyName);
if(inputString == null) {
throw new RuntimeException("Using " + SIGNER_NAME + " authentication requires providing the " + propertyName + " system property.");
}

try {
return transform.apply(inputString);
} catch (Exception ex) {
throw new RuntimeException("Unable to process property " + propertyName + " with error: " + ex.getMessage());
}
}

@Override
public void accept(Request request) {
ExecutionAttributes attributes = new ExecutionAttributes();
attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, credentialsProvider.resolveCredentials());
attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service);
attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region);

SdkHttpFullRequest incomingSdkRequest = convertIncomingRequest(request);

SdkHttpFullRequest signedRequest = awsSigner.sign(incomingSdkRequest, attributes);

modifyOutgoingRequest(request, signedRequest);
}

private SdkHttpFullRequest convertIncomingRequest(Request request) {
URI uri;
try {
uri = request.getUri().toJavaNetURI();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.fromValue(request.getMethod().name()))
.uri(uri);

requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(request.getBody().getBytes()));
requestBuilder.headers(headerArrayToMap(request.getHeaders()));

return requestBuilder.build();
}

private static Map<String, List<String>> headerArrayToMap(final HttpHeaders headers) {
Map<String, List<String>> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Map.Entry<String, String> header : headers) {
if (!skipHeader(header)) {
headersMap.put(header.getKey(), headersMap
.getOrDefault(header.getKey(),
new LinkedList<>(Collections.singletonList(header.getValue()))));
}
}
return headersMap;
}

private static boolean skipHeader(final Map.Entry<String, String> header) {
return (CONTENT_LENGTH.equalsIgnoreCase(header.getKey())
&& ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0
|| HOST.equalsIgnoreCase(header.getKey()); // Host comes from endpoint
}

private void modifyOutgoingRequest(Request request, SdkHttpFullRequest signedRequest) {
resetHeaders(request, signedRequest);
}

private void resetHeaders(Request request, SdkHttpFullRequest signedRequest) {
request.getHeaders().clear();

for (Map.Entry<String, List<String>> headerEntry : signedRequest.headers().entrySet()) {
for (String value : headerEntry.getValue()) {
request.getHeaders().add(headerEntry.getKey(), value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static HttpProtocolBuilder httpsProtocol(final Integer port) {
private static HttpProtocolBuilder httpProtocol(final String protocol, final String host, final Integer port) {
return HttpDsl.http
.baseUrl(asUrl(protocol, host, port))
.sign(SignerProvider.getSigner())
.acceptHeader("application/json")
.header("Content-Type", "application/json");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.dataprepper.test.performance.tools;

import io.gatling.http.client.Request;

import java.util.function.Consumer;

public class SignerProvider {
private static final Consumer<Request> NO_OP_SIGNER = r -> { };

public static Consumer<Request> getSigner() {
String authentication = System.getProperty("authentication");

if(AwsRequestSigner.SIGNER_NAME.equals(authentication)) {
return new AwsRequestSigner();
}

return NO_OP_SIGNER;
}
}

0 comments on commit 393697f

Please sign in to comment.