diff --git a/build.gradle b/build.gradle index b211b08b17..eb503d1a96 100644 --- a/build.gradle +++ b/build.gradle @@ -2232,6 +2232,7 @@ project(':tools') { // AutoMQ inject start implementation project(':automq-shell') implementation libs.kafkaAvroSerializer + implementation libs.bucket4j // AutoMQ inject end implementation project(':storage') diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 300ebb6d79..ca9456232e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -106,7 +106,7 @@ private void run() { LOGGER.info("Creating consumers..."); int consumers = consumerService.createConsumers(topics, config.consumersConfig()); - consumerService.start(this::messageReceived); + consumerService.start(this::messageReceived, config.maxPollRate); LOGGER.info("Created {} consumers, took {} ms", consumers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); LOGGER.info("Creating producers..."); diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index 81627ee4f8..77fc927dbb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -11,6 +11,8 @@ package org.apache.kafka.tools.automq.perf; +import io.github.bucket4j.BlockingBucket; +import io.github.bucket4j.Bucket; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; @@ -91,10 +93,15 @@ public int createConsumers(List topics, ConsumersConfig config) { return count; } - public void start(ConsumerCallback callback) { + public void start(ConsumerCallback callback, int pollRate) { + BlockingBucket bucket = rateLimitBucket(pollRate); + ConsumerCallback callbackWithRateLimit = (tp, p, st) -> { + callback.messageReceived(tp, p, st); + bucket.consume(1); + }; CompletableFuture.allOf( groups.stream() - .map(group -> group.start(callback)) + .map(group -> group.start(callbackWithRateLimit)) .toArray(CompletableFuture[]::new) ).join(); } @@ -122,6 +129,15 @@ public int consumerCount() { .sum(); } + private BlockingBucket rateLimitBucket(int rateLimit) { + return Bucket.builder() + .addLimit(limit -> limit + .capacity(rateLimit / 10) + .refillGreedy(rateLimit, Duration.ofSeconds(1)) + ).build() + .asBlocking(); + } + @Override public void close() { admin.close(); @@ -137,7 +153,7 @@ public interface ConsumerCallback { * @param payload the received message payload * @param sendTimeNanos the time in nanoseconds when the message was sent */ - void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos); + void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) throws InterruptedException; } public static class ConsumersConfig { @@ -316,7 +332,7 @@ private void pollRecords(KafkaConsumer consumer, ConsumerCallbac TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); callback.messageReceived(topicPartition, record.value(), sendTimeNanos); } - } catch (InterruptException e) { + } catch (InterruptException | InterruptedException e) { // ignore, as we are closing } catch (Exception e) { LOGGER.warn("exception occur while consuming message", e); diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java index a56f2af514..b260c6a215 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; +import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.between; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.nonNegativeInteger; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.notLessThan; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.positiveInteger; @@ -53,6 +54,7 @@ public class PerfConfig { public final int randomPoolSize; public final int sendRate; public final int sendRateDuringCatchup; + public final int maxPollRate; public final int backlogDurationSeconds; public final int groupStartDelaySeconds; public final int warmupDurationMinutes; @@ -92,6 +94,7 @@ public PerfConfig(String[] args) { randomPoolSize = ns.getInt("randomPoolSize"); sendRate = ns.getInt("sendRate"); sendRateDuringCatchup = ns.getInt("sendRateDuringCatchup") == null ? sendRate : ns.getInt("sendRateDuringCatchup"); + maxPollRate = ns.getInt("maxPollRate"); backlogDurationSeconds = ns.getInt("backlogDurationSeconds"); groupStartDelaySeconds = ns.getInt("groupStartDelaySeconds"); warmupDurationMinutes = ns.getInt("warmupDurationMinutes"); @@ -209,6 +212,12 @@ public static ArgumentParser parser() { .dest("sendRateDuringCatchup") .metavar("SEND_RATE_DURING_CATCHUP") .help("The send rate in messages per second during catchup. If not set, the send rate will be used."); + parser.addArgument("-m", "--max-poll-rate") + .setDefault(1_000_000_000) + .type(between(0, 1_000_000_000)) + .dest("maxPollRate") + .metavar("MAX_POLL_RATE") + .help("The max poll rate in messages per second."); parser.addArgument("-b", "--backlog-duration") .setDefault(0) .type(notLessThan(300)) @@ -351,6 +360,10 @@ public static IntegerArgumentType positiveInteger() { public static IntegerArgumentType notLessThan(int min) { return new IntegerArgumentType(value -> value < min ? "expected an integer not less than " + min + ", but got " + value : null); } + + public static IntegerArgumentType between(int min, int max) { + return new IntegerArgumentType(value -> value < min || value > max ? "expected an integer between " + min + " and " + max + ", but got " + value : null); + } } @FunctionalInterface