-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance test: trace-ingestion load generator #983
Changes from all commits
9860a0b
17142a7
92e732e
6330559
2eb414f
b5cee9d
0f6b1c6
863554a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.test.performance; | ||
|
||
import io.gatling.javaapi.core.CoreDsl; | ||
import io.gatling.javaapi.core.ScenarioBuilder; | ||
import io.gatling.javaapi.core.Simulation; | ||
import io.gatling.javaapi.http.HttpDsl; | ||
import org.opensearch.dataprepper.test.performance.tools.TraceChain; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
public class FixedClientTraceSimulation extends Simulation { | ||
private static final Integer batchSize = Integer.getInteger("batchSize", 20); | ||
private static final Integer users = Integer.getInteger("users", 10); | ||
private static final Duration duration = Duration.ofMinutes(Long.getLong("duration", 5)); | ||
private static final List<String> endpoints = Arrays.asList(System.getProperty("endpoints").split(",")); | ||
|
||
ScenarioBuilder fixedScenario = CoreDsl.scenario("Slow Burn") | ||
.during(duration) | ||
.on(TraceChain.sendUnframedExportTraceServiceRequest(endpoints, batchSize)); | ||
|
||
{ | ||
setUp(fixedScenario.injectOpen(CoreDsl.atOnceUsers(users))) | ||
.protocols(HttpDsl.http | ||
.acceptHeader("application/json") | ||
.header("Content-Type", "application/json; charset=utf-8")) | ||
.assertions(CoreDsl.global().requestsPerSec().gt(140.0)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.test.performance; | ||
|
||
import io.gatling.javaapi.core.CoreDsl; | ||
import io.gatling.javaapi.core.ScenarioBuilder; | ||
import io.gatling.javaapi.core.Simulation; | ||
import org.opensearch.dataprepper.test.performance.tools.Protocol; | ||
import org.opensearch.dataprepper.test.performance.tools.TraceChain; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
public class SlowBurnTraceSimulation extends Simulation { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming choice is a little confusing. |
||
private static final Integer batchSize = Integer.getInteger("batchSize", 200); | ||
private static final Integer rampUsers = Integer.getInteger("rampUsers", 60); | ||
private static final Duration rampUpTime = Duration.ofMinutes(Integer.getInteger("rampUpTime", 60)); | ||
private static final Duration peakDuration = Duration.ofMinutes(Integer.getInteger("rampUpTime", 5)); | ||
private static final List<String> endpoints = Arrays.asList(System.getProperty("endpoints").split(",")); | ||
|
||
ScenarioBuilder rampUpScenario = CoreDsl.scenario("Slow Burn") | ||
.forever() | ||
.on(TraceChain.sendUnframedExportTraceServiceRequest(endpoints, batchSize)); | ||
|
||
{ | ||
setUp( | ||
rampUpScenario.injectOpen( | ||
CoreDsl.rampUsers(rampUsers).during(rampUpTime), | ||
CoreDsl.nothingFor(peakDuration) | ||
) | ||
) | ||
.maxDuration(rampUpTime.plus(peakDuration)) | ||
.protocols(Protocol.httpProtocol()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.test.performance.tools; | ||
|
||
import com.google.protobuf.util.JsonFormat; | ||
import io.gatling.javaapi.core.ChainBuilder; | ||
import io.gatling.javaapi.core.CoreDsl; | ||
import io.gatling.javaapi.http.HttpDsl; | ||
|
||
import java.util.List; | ||
|
||
public class TraceChain { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's call this: TraceChainBuilder. If we are introducing another |
||
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace(); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's create a private constructor here:
|
||
public static ChainBuilder sendUnframedExportTraceServiceRequest(final List<String> endpoints, | ||
final int batchSize) { | ||
ChainBuilder chainBuilder = ChainBuilder.EMPTY; | ||
chainBuilder = chainBuilder.exec(session -> { | ||
final List<String> requestJsons = TraceTemplates.exportTraceServiceRequestJsons(endpoints.size(), batchSize); | ||
for (int i = 0; i < endpoints.size(); i++) { | ||
session = session.set(endpoints.get(i), requestJsons.get(i)); | ||
} | ||
return session; | ||
}); | ||
for (final String endpoint : endpoints) { | ||
chainBuilder = chainBuilder.exec( | ||
HttpDsl.http("Post to " + endpoint) | ||
.post("http://" + endpoint + "/opentelemetry.proto.collector.trace.v1.TraceService/Export") | ||
.body(CoreDsl.StringBody(session -> session.getString(endpoint)))); | ||
} | ||
return chainBuilder; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you run the formatter on this code? I am noticing some spacing patterns that are making it difficult to read. |
||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.test.performance.tools; | ||
|
||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.InvalidProtocolBufferException; | ||
import com.google.protobuf.util.JsonFormat; | ||
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; | ||
import io.opentelemetry.proto.common.v1.AnyValue; | ||
import io.opentelemetry.proto.common.v1.KeyValue; | ||
import io.opentelemetry.proto.resource.v1.Resource; | ||
import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; | ||
import io.opentelemetry.proto.trace.v1.ResourceSpans; | ||
import io.opentelemetry.proto.trace.v1.Span; | ||
import io.opentelemetry.proto.trace.v1.Status; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Random; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.stream.Collectors; | ||
|
||
public class TraceTemplates { | ||
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace(); | ||
private static final Random RANDOM = new Random(); | ||
private static final List<String> SERVICE_NAMES = Arrays.asList("SERVICE_A", "SERVICE_B", "SERVICE_C", "SERVICE_D", "SERVICE_E"); | ||
private static final List<String> SPAN_NAMES = Arrays.asList("SPAN_A", "SPAN_B", "SPAN_C", "SPAN_D", "SPAN_E"); | ||
|
||
public static List<String> exportTraceServiceRequestJsons(final int peerListSize, final int batchSize) { | ||
return getExportTraceServiceRequests(peerListSize, batchSize).stream().map(exportTraceServiceRequest -> { | ||
try { | ||
return PRINTER.print(exportTraceServiceRequest); | ||
} catch (InvalidProtocolBufferException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}).collect(Collectors.toList()); | ||
} | ||
|
||
public static List<ExportTraceServiceRequest> getExportTraceServiceRequests(final int peerListSize, final int batchSize) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do any of the static functions below need to be pubic? |
||
final List<ExportTraceServiceRequest> results = new ArrayList<>(); | ||
final List<ResourceSpans> resourceSpansList = getTraceGroupResourceSpans(peerListSize * batchSize); | ||
for (int i=0; i<peerListSize*batchSize; i+=batchSize) { | ||
results.add(getExportTraceServiceRequest(resourceSpansList.subList(i, i+batchSize))); | ||
} | ||
Collections.shuffle(results); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this impact reproducibility with the workload? |
||
return results; | ||
} | ||
|
||
public static ExportTraceServiceRequest getExportTraceServiceRequest(final List<ResourceSpans> resourceSpansList) { | ||
return ExportTraceServiceRequest.newBuilder() | ||
.addAllResourceSpans(resourceSpansList) | ||
.build(); | ||
} | ||
|
||
public static List<ResourceSpans> getTraceGroupResourceSpans(final int traceGroupSize) { | ||
final ArrayList<ResourceSpans> spansList = new ArrayList<>(); | ||
final byte[] traceId = getRandomBytes(16); | ||
final byte[] rootSpanId = getRandomBytes(8); | ||
final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); | ||
for(int i = 0; i < traceGroupSize; i++) { | ||
final byte[] parentId = (i == 0? null : rootSpanId); | ||
final byte[] spanId = (i == 0? rootSpanId : getRandomBytes(8)); | ||
final int randomServiceIndex = threadLocalRandom.nextInt(SERVICE_NAMES.size()); | ||
final int randomSpanNameIndex = threadLocalRandom.nextInt(SPAN_NAMES.size()); | ||
final String serviceName = SERVICE_NAMES.get(randomServiceIndex); | ||
final String spanName = SPAN_NAMES.get(randomSpanNameIndex); | ||
final Span.SpanKind spanKind = Span.SpanKind.SPAN_KIND_SERVER; | ||
final long endTime = System.currentTimeMillis() * 1000000; | ||
final long durationInNanos = 100000 + RANDOM.nextInt(500000); | ||
final ResourceSpans rs = getResourceSpans( | ||
traceId, | ||
spanId, | ||
parentId, | ||
serviceName, | ||
spanName, | ||
spanKind, | ||
endTime, | ||
durationInNanos, | ||
1 | ||
); | ||
spansList.add(rs); | ||
} | ||
return spansList; | ||
} | ||
|
||
public static ResourceSpans getResourceSpans(final byte[] traceId, final byte[] spanId, final byte[] parentId, | ||
final String serviceName, final String spanName, final Span.SpanKind spanKind, | ||
final long endTimeInNanos, final long durationInNanos, final Integer statusCode) { | ||
final ByteString parentSpanId = parentId != null ? ByteString.copyFrom(parentId) : ByteString.EMPTY; | ||
final long startTimeInNanos = endTimeInNanos - durationInNanos; | ||
final KeyValue.Builder kvBuilder = KeyValue.newBuilder() | ||
.setKey("key") | ||
.setValue(AnyValue.newBuilder().setStringValue("value")); | ||
return ResourceSpans.newBuilder() | ||
.setResource( | ||
Resource.newBuilder() | ||
.addAttributes(KeyValue.newBuilder() | ||
.setKey("service.name") | ||
.setValue(AnyValue.newBuilder().setStringValue(serviceName).build()).build()) | ||
.build() | ||
) | ||
.addInstrumentationLibrarySpans( | ||
0, | ||
InstrumentationLibrarySpans.newBuilder() | ||
.addSpans( | ||
Span.newBuilder() | ||
.setName(spanName) | ||
.setKind(spanKind) | ||
.setSpanId(ByteString.copyFrom(spanId)) | ||
.setParentSpanId(parentSpanId) | ||
.setTraceId(ByteString.copyFrom(traceId)) | ||
.setStartTimeUnixNano(startTimeInNanos) | ||
.setEndTimeUnixNano(endTimeInNanos) | ||
.setStatus(Status.newBuilder().setCodeValue(statusCode)) | ||
.addEvents(Span.Event.newBuilder() | ||
.setName(spanName) | ||
.setTimeUnixNano(endTimeInNanos) | ||
.addAttributes(kvBuilder)) | ||
.addLinks(Span.Link.newBuilder() | ||
.setTraceId(ByteString.copyFrom(traceId)) | ||
.setSpanId(ByteString.copyFrom(spanId)) | ||
.setTraceState("test state") | ||
.addAttributes(kvBuilder)) | ||
.build() | ||
) | ||
.build() | ||
) | ||
.build(); | ||
} | ||
|
||
public static byte[] getRandomBytes(int len) { | ||
byte[] bytes = new byte[len]; | ||
RANDOM.nextBytes(bytes); | ||
return bytes; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noob question, why are we using a block?