Skip to content

Commit

Permalink
Add virtual threads support (#224)
Browse files Browse the repository at this point in the history
* (tmp)

* (tmp)

* tmp

* ReentrantLock version

* Use gradle 8.5 and java 21

* Eliminate synchronized in ProcessingContextImpl

* Revert "ReentrantLock version"

This reverts commit b03e678478cf4600add4f941519a308200bb7e64.

* Introduce SubPartitionRuntime to enable switching pthread and vthread

* Support SubPartitionRuntime in benchmark mode

* (cleanup) no longer used jvm flags

* Eliminate use of synchronized

* more eliminate use of synchronized

* Add it for vthread mode

* Revert source/target compatibility to java8

* no longer need enable-preview

* garbage

* Follow up work

* Use just Java 21 or higher to build project

* Handle TODOs

* Add integration test for Vthread runtime

* Add support to split I/O simulation latency into many in benchmark

* More understandable API

* Try loading async-profiler before warmup so that it won't cause deoptimization later

* Preload AP but record only the actual execution part

* (remove) disable metrics once

* ThreadPoolSubpartitions should take care of thread_pool runtime specific metrics

* Run benchmark with fixed heapsize

* Avoid stream API for better performance

* Benchmark use PROVIDED scope processor

* (fixup) reduce cost of cleanup

* Explicitly set -server option for benchmark

* Use ConcurrentHashMap.newKeySet

* (fixup) apply feedback

* (fixup) nit

* (fixiup) apply feedback for scheduled executor
  • Loading branch information
kawamuray authored Mar 6, 2024
1 parent 2724000 commit 83be0df
Show file tree
Hide file tree
Showing 63 changed files with 1,175 additions and 574 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [8, 11, 17, 20]
java: [21]
steps:
- uses: actions/checkout@v2
- name: Setup java
Expand Down
2 changes: 1 addition & 1 deletion benchmark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ dependencies {
// To serialize java.time.Duration
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion"

runtimeOnly "ch.qos.logback:logback-classic:1.2.12"
runtimeOnly "ch.qos.logback:logback-classic:1.4.11"
}
2 changes: 1 addition & 1 deletion benchmark/debm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ if [[ "$*" == *--taskstats* ]] && [[ "$*" != *--taskstats-bin* ]] && ! which jta
extra_opts="$extra_opts --taskstats-bin=$file"
fi

exec java -XX:+UseG1GC -cp "$classpath" com.linecorp.decaton.benchmark.Main $extra_opts "$@"
exec java -server -Xmx8g -Xcomp -XX:+UseG1GC -cp "$classpath" com.linecorp.decaton.benchmark.Main $extra_opts "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class BenchmarkConfig {
* Latency to simulate as processing duration.
*/
int simulateLatencyMs;
/**
* Count to repeat the simulateLatencyMs.
*/
int latencyCount;
/**
* Optional bootstrap.servers to specify the cluster to use for testing. Otherwise local embedded cluster is
* used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;

import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.SubPartitionRuntime;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;
import com.linecorp.decaton.processor.runtime.SubscriptionStateListener;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.runtime.ProcessorScope;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
Expand Down Expand Up @@ -71,13 +71,18 @@ public void init(Config config, Recording recording, ResourceTracker resourceTra
// value than zero with the default "latest" reset policy.
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

SubPartitionRuntime subPartitionRuntime = SubPartitionRuntime.THREAD_POOL;
List<Property<?>> properties = new ArrayList<>();
for (Map.Entry<String, String> entry : config.parameters().entrySet()) {
String name = entry.getKey();
Function<String, Object> ctor = propertyConstructors.get(name);
Object value = ctor.apply(entry.getValue());
Property<?> prop = ProcessorProperties.propertyForName(name, value);
properties.add(prop);
if ("decaton.subpartition.runtime".equals(name)) {
subPartitionRuntime = SubPartitionRuntime.valueOf(entry.getValue());
} else {
Function<String, Object> ctor = propertyConstructors.get(name);
Object value = ctor.apply(entry.getValue());
Property<?> prop = ProcessorProperties.propertyForName(name, value);
properties.add(prop);
}
}

registry = new LoggingMeterRegistry(new LoggingRegistryConfig() {
Expand All @@ -98,6 +103,7 @@ public String get(String key) {
.newBuilder("decaton-benchmark")
.consumerConfig(props)
.addProperties(StaticPropertySupplier.of(properties))
.subPartitionRuntime(subPartitionRuntime)
.processorsBuilder(
ProcessorsBuilder.consuming(config.topic(),
(TaskExtractor<Task>) bytes -> {
Expand All @@ -107,11 +113,10 @@ public String get(String key) {
TaskMetadata.builder().build(), task, bytes);
})
.thenProcess(
() -> (ctx, task) -> {
(ctx, task) -> {
resourceTracker.track(Thread.currentThread().getId());
recording.process(task);
},
ProcessorScope.THREAD))
}))
.stateListener(state -> {
if (state == SubscriptionStateListener.State.RUNNING) {
startLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public BenchmarkResult execute(Config config, Consumer<Stage> stageCallback) thr

log.info("Loading runner {}", bmConfig.runner());
Runner runner = Runner.fromClassName(bmConfig.runner());
Recording recording = new Recording(bmConfig.tasks(), bmConfig.warmupTasks());
Recording recording = new Recording(bmConfig.tasks(), bmConfig.warmupTasks(),
bmConfig.latencyCount());
ResourceTracker resourceTracker = new ResourceTracker();
log.info("Initializing runner {}", bmConfig.runner());

Expand All @@ -68,13 +69,15 @@ public BenchmarkResult execute(Config config, Consumer<Stage> stageCallback) thr
final Optional<Path> profilerOutput;
final Optional<Path> taskstatsOutput;
try {
profiling.start();
stageCallback.accept(Stage.READY_WARMUP);
if (!recording.awaitWarmupComplete(3, TimeUnit.MINUTES)) {
throw new RuntimeException("timeout on awaiting benchmark to complete");
}
if (!bmConfig.skipWaitingJIT()) {
awaitJITGetsSettled();
}
profiling.stop();

profiling.start();
JvmTracker jvmTracker = JvmTracker.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public final class Main implements Callable<Integer> {
defaultValue = "0")
private int simulateLatencyMs;

@Option(names = "--latency-count",
description = "The number of times to sleep for the latency to simulating multiple I/O",
defaultValue = "1")
private int latencyCount;

@Option(names = "--bootstrap-servers",
description = "Optional bootstrap.servers property. if supplied, the specified kafka cluster is used for benchmarking instead of local embedded clusters")
private String bootstrapServers;
Expand Down Expand Up @@ -144,6 +149,7 @@ public Integer call() throws Exception {
.tasks(tasks)
.warmupTasks(warmupTasks)
.simulateLatencyMs(simulateLatencyMs)
.latencyCount(latencyCount)
.bootstrapServers(bootstrapServers)
.params(params)
.skipWaitingJIT(skipWaitingJIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ static class ExecutionRecord {
private final AtomicLong maxDeliveryTime = new AtomicLong();
private final AtomicLong totalDeliveryTime = new AtomicLong();
private final AtomicLong processCount = new AtomicLong();
private final int latencyCount;

public ExecutionRecord(int latencyCount) {
this.latencyCount = latencyCount;
}

void process(Task task, boolean warmup) {
if (!warmup) {
Expand All @@ -50,7 +55,9 @@ void process(Task task, boolean warmup) {
try {
int latency = task.getProcessLatency();
if (latency > 0) {
Thread.sleep(latency);
for (int i = 0; i < latencyCount; i++) {
Thread.sleep(latency);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -68,10 +75,10 @@ void process(Task task, boolean warmup) {
private volatile long startTimeMs;
private volatile long completeTimeMs;

public Recording(int tasks, int warmupTasks) {
public Recording(int tasks, int warmupTasks, int latencyCount) {
this.tasks = tasks;
this.warmupTasks = warmupTasks;
executionRecord = new ExecutionRecord();
executionRecord = new ExecutionRecord(latencyCount);
processedCount = new AtomicInteger();
warmupLatch = new CountDownLatch(1);
if (warmupTasks == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public void print(BenchmarkConfig config, OutputStream out, BenchmarkResult resu
pw.printf("# Runner: %s\n", config.runner());
pw.printf("# Tasks: %d (warmup: %d)\n", config.tasks(), config.warmupTasks());
pw.printf("# Simulated Latency(ms): %d\n", config.simulateLatencyMs());
pw.printf("# Latency Count: %d\n", config.latencyCount());
pw.printf("# Total Simulated Latency(ms): %d\n", config.simulateLatencyMs() * config.latencyCount());
for (Entry<String, String> e : config.params().entrySet()) {
pw.printf("# Param: %s=%s\n", e.getKey(), e.getValue());
}
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent

plugins {
id "me.champeau.jmh" version "0.6.6" apply false
id 'com.github.johnrengelman.shadow' version '7.1.1' apply false
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
id 'io.freefair.lombok' version '8.2.2' apply false
}

Expand Down
1 change: 0 additions & 1 deletion centraldogma/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"

testImplementation "com.linecorp.centraldogma:centraldogma-testing-junit4:$centralDogmaVersion"
testRuntimeOnly "ch.qos.logback:logback-classic:1.2.12"
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
14 changes: 7 additions & 7 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down Expand Up @@ -202,11 +202,11 @@ fi
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
Expand Down
1 change: 0 additions & 1 deletion processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"

testImplementation project(":protobuf")
testRuntimeOnly "ch.qos.logback:logback-classic:1.2.12"

testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testFixturesImplementation "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -33,11 +33,11 @@
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.decaton.processor.internal.HashableByteArray;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorScope;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.internal.HashableByteArray;
import com.linecorp.decaton.testing.KafkaClusterExtension;
import com.linecorp.decaton.testing.RandomExtension;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
Expand Down Expand Up @@ -163,16 +163,16 @@ public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
private final Map<HashableByteArray, List<TestTask>> produced = new HashMap<>();
private final Map<HashableByteArray, List<TestTask>> processed = new HashMap<>();
private final ConcurrentMap<HashableByteArray, List<TestTask>> produced = new ConcurrentHashMap<>();
private final ConcurrentMap<HashableByteArray, List<TestTask>> processed = new ConcurrentHashMap<>();

@Override
public synchronized void onProduce(ProducedRecord record) {
public void onProduce(ProducedRecord record) {
produced.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
}

@Override
public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) {
public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
processed.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -66,7 +66,7 @@ public void testPropertyDynamicSwitch() throws Exception {
for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
}
Set<HashableByteArray> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<HashableByteArray> processedKeys = ConcurrentHashMap.newKeySet();
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void testPropertyDynamicSwitch() throws Exception {
for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
}
Set<HashableByteArray> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<HashableByteArray> processedKeys = ConcurrentHashMap.newKeySet();
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -79,8 +78,8 @@ public void tearDown() {
}

private static class ProcessRetriedTask implements ProcessingGuarantee {
private final Set<String> producedIds = Collections.synchronizedSet(new HashSet<>());
private final Set<String> processedIds = Collections.synchronizedSet(new HashSet<>());
private final Set<String> producedIds = ConcurrentHashMap.newKeySet();
private final Set<String> processedIds = ConcurrentHashMap.newKeySet();

@Override
public void onProduce(ProducedRecord record) {
Expand All @@ -96,8 +95,13 @@ public void onProcess(TaskMetadata metadata, ProcessedRecord record) {

@Override
public void doAssert() {
TestUtils.awaitCondition("all retried tasks must be processed",
() -> producedIds.size() == processedIds.size());
try {
TestUtils.awaitCondition("all retried tasks must be processed",
() -> producedIds.size() == processedIds.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

Expand Down
Loading

0 comments on commit 83be0df

Please sign in to comment.