diff --git a/performance-test/build.gradle b/performance-test/build.gradle index 65149c9b2c..7b508fe869 100644 --- a/performance-test/build.gradle +++ b/performance-test/build.gradle @@ -15,6 +15,9 @@ repositories { } dependencies { + gatling "io.opentelemetry:opentelemetry-proto:${versionMap.opentelemetryProto}" + gatling "org.apache.commons:commons-lang3:3.12.0" + gatling 'com.google.protobuf:protobuf-java-util:3.19.1' implementation 'com.fasterxml.jackson.core:jackson-core:2.13.0' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.2' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.2' diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientTraceSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientTraceSimulation.java new file mode 100644 index 0000000000..ecc274f1b6 --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientTraceSimulation.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.test.performance; + +import io.gatling.javaapi.core.CoreDsl; +import io.gatling.javaapi.core.ScenarioBuilder; +import io.gatling.javaapi.core.Simulation; +import io.gatling.javaapi.http.HttpDsl; +import org.opensearch.dataprepper.test.performance.tools.TraceChain; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +public class FixedClientTraceSimulation extends Simulation { + private static final Integer batchSize = Integer.getInteger("batchSize", 20); + private static final Integer users = Integer.getInteger("users", 10); + private static final Duration duration = Duration.ofMinutes(Long.getLong("duration", 5)); + private static final List endpoints = Arrays.asList(System.getProperty("endpoints").split(",")); + + ScenarioBuilder fixedScenario = CoreDsl.scenario("Slow Burn") + .during(duration) + .on(TraceChain.sendUnframedExportTraceServiceRequest(endpoints, batchSize)); + + { + setUp(fixedScenario.injectOpen(CoreDsl.atOnceUsers(users))) + .protocols(HttpDsl.http + .acceptHeader("application/json") + .header("Content-Type", "application/json; charset=utf-8")) + .assertions(CoreDsl.global().requestsPerSec().gt(140.0)); + } +} diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnTraceSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnTraceSimulation.java new file mode 100644 index 0000000000..25e2ef5cff --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnTraceSimulation.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.test.performance; + +import io.gatling.javaapi.core.CoreDsl; +import io.gatling.javaapi.core.ScenarioBuilder; +import io.gatling.javaapi.core.Simulation; +import org.opensearch.dataprepper.test.performance.tools.Protocol; +import org.opensearch.dataprepper.test.performance.tools.TraceChain; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +public class SlowBurnTraceSimulation extends Simulation { + private static final Integer batchSize = Integer.getInteger("batchSize", 200); + private static final Integer rampUsers = Integer.getInteger("rampUsers", 60); + private static final Duration rampUpTime = Duration.ofMinutes(Integer.getInteger("rampUpTime", 60)); + private static final Duration peakDuration = Duration.ofMinutes(Integer.getInteger("rampUpTime", 5)); + private static final List endpoints = Arrays.asList(System.getProperty("endpoints").split(",")); + + ScenarioBuilder rampUpScenario = CoreDsl.scenario("Slow Burn") + .forever() + .on(TraceChain.sendUnframedExportTraceServiceRequest(endpoints, batchSize)); + + { + setUp( + rampUpScenario.injectOpen( + CoreDsl.rampUsers(rampUsers).during(rampUpTime), + CoreDsl.nothingFor(peakDuration) + ) + ) + .maxDuration(rampUpTime.plus(peakDuration)) + .protocols(Protocol.httpProtocol()); + } +} diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/TraceChain.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/TraceChain.java new file mode 100644 index 0000000000..c579f62253 --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/TraceChain.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.test.performance.tools; + +import com.google.protobuf.util.JsonFormat; +import io.gatling.javaapi.core.ChainBuilder; +import io.gatling.javaapi.core.CoreDsl; +import io.gatling.javaapi.http.HttpDsl; + +import java.util.List; + +public class TraceChain { + private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace(); + + public static ChainBuilder sendUnframedExportTraceServiceRequest(final List endpoints, + final int batchSize) { + ChainBuilder chainBuilder = ChainBuilder.EMPTY; + chainBuilder = chainBuilder.exec(session -> { + final List requestJsons = TraceTemplates.exportTraceServiceRequestJsons(endpoints.size(), batchSize); + for (int i = 0; i < endpoints.size(); i++) { + session = session.set(endpoints.get(i), requestJsons.get(i)); + } + return session; + }); + for (final String endpoint : endpoints) { + chainBuilder = chainBuilder.exec( + HttpDsl.http("Post to " + endpoint) + .post("http://" + endpoint + "/opentelemetry.proto.collector.trace.v1.TraceService/Export") + .body(CoreDsl.StringBody(session -> session.getString(endpoint)))); + } + return chainBuilder; + } +} diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/TraceTemplates.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/TraceTemplates.java new file mode 100644 index 0000000000..4547b5d4e7 --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/TraceTemplates.java @@ -0,0 +1,141 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.test.performance.tools; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.Span; +import io.opentelemetry.proto.trace.v1.Status; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +public class TraceTemplates { + private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace(); + private static final Random RANDOM = new Random(); + private static final List SERVICE_NAMES = Arrays.asList("SERVICE_A", "SERVICE_B", "SERVICE_C", "SERVICE_D", "SERVICE_E"); + private static final List SPAN_NAMES = Arrays.asList("SPAN_A", "SPAN_B", "SPAN_C", "SPAN_D", "SPAN_E"); + + public static List exportTraceServiceRequestJsons(final int peerListSize, final int batchSize) { + return getExportTraceServiceRequests(peerListSize, batchSize).stream().map(exportTraceServiceRequest -> { + try { + return PRINTER.print(exportTraceServiceRequest); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + public static List getExportTraceServiceRequests(final int peerListSize, final int batchSize) { + final List results = new ArrayList<>(); + final List resourceSpansList = getTraceGroupResourceSpans(peerListSize * batchSize); + for (int i=0; i resourceSpansList) { + return ExportTraceServiceRequest.newBuilder() + .addAllResourceSpans(resourceSpansList) + .build(); + } + + public static List getTraceGroupResourceSpans(final int traceGroupSize) { + final ArrayList spansList = new ArrayList<>(); + final byte[] traceId = getRandomBytes(16); + final byte[] rootSpanId = getRandomBytes(8); + final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); + for(int i = 0; i < traceGroupSize; i++) { + final byte[] parentId = (i == 0? null : rootSpanId); + final byte[] spanId = (i == 0? rootSpanId : getRandomBytes(8)); + final int randomServiceIndex = threadLocalRandom.nextInt(SERVICE_NAMES.size()); + final int randomSpanNameIndex = threadLocalRandom.nextInt(SPAN_NAMES.size()); + final String serviceName = SERVICE_NAMES.get(randomServiceIndex); + final String spanName = SPAN_NAMES.get(randomSpanNameIndex); + final Span.SpanKind spanKind = Span.SpanKind.SPAN_KIND_SERVER; + final long endTime = System.currentTimeMillis() * 1000000; + final long durationInNanos = 100000 + RANDOM.nextInt(500000); + final ResourceSpans rs = getResourceSpans( + traceId, + spanId, + parentId, + serviceName, + spanName, + spanKind, + endTime, + durationInNanos, + 1 + ); + spansList.add(rs); + } + return spansList; + } + + public static ResourceSpans getResourceSpans(final byte[] traceId, final byte[] spanId, final byte[] parentId, + final String serviceName, final String spanName, final Span.SpanKind spanKind, + final long endTimeInNanos, final long durationInNanos, final Integer statusCode) { + final ByteString parentSpanId = parentId != null ? ByteString.copyFrom(parentId) : ByteString.EMPTY; + final long startTimeInNanos = endTimeInNanos - durationInNanos; + final KeyValue.Builder kvBuilder = KeyValue.newBuilder() + .setKey("key") + .setValue(AnyValue.newBuilder().setStringValue("value")); + return ResourceSpans.newBuilder() + .setResource( + Resource.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("service.name") + .setValue(AnyValue.newBuilder().setStringValue(serviceName).build()).build()) + .build() + ) + .addInstrumentationLibrarySpans( + 0, + InstrumentationLibrarySpans.newBuilder() + .addSpans( + Span.newBuilder() + .setName(spanName) + .setKind(spanKind) + .setSpanId(ByteString.copyFrom(spanId)) + .setParentSpanId(parentSpanId) + .setTraceId(ByteString.copyFrom(traceId)) + .setStartTimeUnixNano(startTimeInNanos) + .setEndTimeUnixNano(endTimeInNanos) + .setStatus(Status.newBuilder().setCodeValue(statusCode)) + .addEvents(Span.Event.newBuilder() + .setName(spanName) + .setTimeUnixNano(endTimeInNanos) + .addAttributes(kvBuilder)) + .addLinks(Span.Link.newBuilder() + .setTraceId(ByteString.copyFrom(traceId)) + .setSpanId(ByteString.copyFrom(spanId)) + .setTraceState("test state") + .addAttributes(kvBuilder)) + .build() + ) + .build() + ) + .build(); + } + + public static byte[] getRandomBytes(int len) { + byte[] bytes = new byte[len]; + RANDOM.nextBytes(bytes); + return bytes; + } +}